You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/08/22 20:24:34 UTC

hive git commit: HIVE-11586: ObjectInspectorFactory.getReflectionObjectInspector is not thread-safe (Jimmy, reviewed by Szehon, Xuefu)

Repository: hive
Updated Branches:
  refs/heads/master 3c1eae0c4 -> e2d148ba8


HIVE-11586: ObjectInspectorFactory.getReflectionObjectInspector is not thread-safe (Jimmy, reviewed by Szehon, Xuefu)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e2d148ba
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e2d148ba
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e2d148ba

Branch: refs/heads/master
Commit: e2d148ba83a81fb03303bd3120b693759a2cd0ff
Parents: 3c1eae0
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Mon Aug 17 10:08:09 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Sat Aug 22 11:16:59 2015 -0700

----------------------------------------------------------------------
 .../objectinspector/ObjectInspectorFactory.java | 61 +++++++++++++----
 .../ReflectionStructObjectInspector.java        | 60 ++++++++++++++---
 .../ThriftUnionObjectInspector.java             | 28 +++++---
 .../TestReflectionObjectInspectors.java         | 71 +++++++++++++++++++-
 4 files changed, 185 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
index 97bb715..2b3fded 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
@@ -23,6 +23,7 @@ import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -61,14 +62,38 @@ public final class ObjectInspectorFactory {
     JAVA, THRIFT, PROTOCOL_BUFFERS, AVRO
   };
 
-  private static ConcurrentHashMap<Type, ObjectInspector> objectInspectorCache = new ConcurrentHashMap<Type, ObjectInspector>();
+  static ConcurrentHashMap<Type, ObjectInspector> objectInspectorCache = new ConcurrentHashMap<Type, ObjectInspector>();
 
   public static ObjectInspector getReflectionObjectInspector(Type t,
       ObjectInspectorOptions options) {
+    return getReflectionObjectInspector(t, options, true);
+  }
+
+  static ObjectInspector getReflectionObjectInspector(Type t,
+      ObjectInspectorOptions options, boolean ensureInited) {
     ObjectInspector oi = objectInspectorCache.get(t);
     if (oi == null) {
-      oi = getReflectionObjectInspectorNoCache(t, options);
-      objectInspectorCache.put(t, oi);
+      oi = getReflectionObjectInspectorNoCache(t, options, ensureInited);
+      ObjectInspector prev = objectInspectorCache.putIfAbsent(t, oi);
+      if (prev != null) {
+        oi = prev;
+      }
+    }
+    if (ensureInited && oi instanceof ReflectionStructObjectInspector) {
+      ReflectionStructObjectInspector soi = (ReflectionStructObjectInspector) oi;
+      synchronized (soi) {
+        HashSet<Type> checkedTypes = new HashSet<Type>();
+        while (!soi.isFullyInited(checkedTypes)) {
+          try {
+            // Wait for up to 3 seconds before checking if any init error.
+            // Init should be fast if no error, no need to make this configurable.
+            soi.wait(3000);
+          } catch (InterruptedException e) {
+            throw new RuntimeException("Interrupted while waiting for "
+              + soi.getClass().getName() + " to initialize", e);
+          }
+        }
+      }
     }
     verifyObjectInspector(options, oi, ObjectInspectorOptions.JAVA, new Class[]{ThriftStructObjectInspector.class,
       ProtocolBuffersStructObjectInspector.class});
@@ -88,10 +113,10 @@ public final class ObjectInspectorFactory {
    * @param classes ObjectInspector should not be of these types
    */
   private static void verifyObjectInspector(ObjectInspectorOptions option, ObjectInspector oi,
-      ObjectInspectorOptions checkOption, Class[] classes) {
+      ObjectInspectorOptions checkOption, Class<?>[] classes) {
 
     if (option.equals(checkOption)) {
-      for (Class checkClass : classes) {
+      for (Class<?> checkClass : classes) {
         if (oi.getClass().equals(checkClass)) {
           throw new RuntimeException(
             "Cannot call getObjectInspectorByReflection with more then one of " +
@@ -102,11 +127,11 @@ public final class ObjectInspectorFactory {
   }
 
   private static ObjectInspector getReflectionObjectInspectorNoCache(Type t,
-      ObjectInspectorOptions options) {
+      ObjectInspectorOptions options, boolean ensureInited) {
     if (t instanceof GenericArrayType) {
       GenericArrayType at = (GenericArrayType) t;
       return getStandardListObjectInspector(getReflectionObjectInspector(at
-          .getGenericComponentType(), options));
+          .getGenericComponentType(), options, ensureInited));
     }
 
     if (t instanceof ParameterizedType) {
@@ -115,14 +140,14 @@ public final class ObjectInspectorFactory {
       if (List.class.isAssignableFrom((Class<?>) pt.getRawType()) ||
           Set.class.isAssignableFrom((Class<?>) pt.getRawType())) {
         return getStandardListObjectInspector(getReflectionObjectInspector(pt
-            .getActualTypeArguments()[0], options));
+            .getActualTypeArguments()[0], options, ensureInited));
       }
       // Map?
       if (Map.class.isAssignableFrom((Class<?>) pt.getRawType())) {
         return getStandardMapObjectInspector(getReflectionObjectInspector(pt
-            .getActualTypeArguments()[0], options),
+            .getActualTypeArguments()[0], options, ensureInited),
             getReflectionObjectInspector(pt.getActualTypeArguments()[1],
-            options));
+            options, ensureInited));
       }
       // Otherwise convert t to RawType so we will fall into the following if
       // block.
@@ -186,8 +211,20 @@ public final class ObjectInspectorFactory {
 
     // put it into the cache BEFORE it is initialized to make sure we can catch
     // recursive types.
-    objectInspectorCache.put(t, oi);
-    oi.init(c, options);
+    ReflectionStructObjectInspector prev =
+        (ReflectionStructObjectInspector) objectInspectorCache.putIfAbsent(t, oi);
+    if (prev != null) {
+      oi = prev;
+    } else {
+      try {
+        oi.init(t, c, options);
+      } finally {
+        if (!oi.inited) {
+          // Failed to init, remove it from cache
+          objectInspectorCache.remove(t, oi);
+        }
+      }
+    }
     return oi;
 
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
index 78e6066..22f8051 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hive.serde2.objectinspector;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -81,6 +83,8 @@ public class ReflectionStructObjectInspector extends
 
   Class<?> objectClass;
   List<MyField> fields;
+  volatile boolean inited = false;
+  volatile Type type;
 
   public Category getCategory() {
     return Category.STRUCT;
@@ -113,12 +117,44 @@ public class ReflectionStructObjectInspector extends
   }
 
   /**
+   * Check if this inspector and all its field inspectors are initialized.
+   */
+  protected boolean isFullyInited(Set<Type> checkedTypes) {
+    if (type != null && // when type is not set, init hasn't been called yet
+        ObjectInspectorFactory.objectInspectorCache.get(type) != this) {
+      // This object should be the same as in cache, otherwise, it must be removed due to init error
+      throw new RuntimeException("Cached object inspector is gone while waiting for it to initialize");
+    }
+
+    if (!inited) {
+      return false;
+    }
+
+    // We don't want to check types already checked
+    checkedTypes.add(type);
+
+    // This inspector is initialized, we still need
+    // to check if all field inspectors are initialized
+    for (StructField field: getAllStructFieldRefs()) {
+      ObjectInspector oi = field.getFieldObjectInspector();
+      if (oi instanceof ReflectionStructObjectInspector) {
+        ReflectionStructObjectInspector soi = (ReflectionStructObjectInspector) oi;
+        if (!checkedTypes.contains(soi.type) && !soi.isFullyInited(checkedTypes)) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
    * This method is only intended to be used by Utilities class in this package.
    * The reason that this method is not recursive by itself is because we want
    * to allow recursive types.
    */
-  protected void init(Class<?> objectClass,
+  protected void init(Type type, Class<?> objectClass,
       ObjectInspectorFactory.ObjectInspectorOptions options) {
+    this.type = type;
 
     verifyObjectClassType(objectClass);
     this.objectClass = objectClass;
@@ -126,16 +162,20 @@ public class ReflectionStructObjectInspector extends
 
     Field[] reflectionFields = ObjectInspectorUtils
         .getDeclaredNonStaticFields(objectClass);
-    fields = new ArrayList<MyField>(structFieldObjectInspectors.size());
-    int used = 0;
-    for (int i = 0; i < reflectionFields.length; i++) {
-      if (!shouldIgnoreField(reflectionFields[i].getName())) {
-        reflectionFields[i].setAccessible(true);
-        fields.add(new MyField(i, reflectionFields[i], structFieldObjectInspectors
-            .get(used++)));
+    synchronized (this) {
+      fields = new ArrayList<MyField>(structFieldObjectInspectors.size());
+      int used = 0;
+      for (int i = 0; i < reflectionFields.length; i++) {
+        if (!shouldIgnoreField(reflectionFields[i].getName())) {
+          reflectionFields[i].setAccessible(true);
+          fields.add(new MyField(i, reflectionFields[i], structFieldObjectInspectors
+              .get(used++)));
+        }
       }
+      assert (fields.size() == structFieldObjectInspectors.size());
+      inited = true;
+      notifyAll();
     }
-    assert (fields.size() == structFieldObjectInspectors.size());
   }
 
   // ThriftStructObjectInspector will override and ignore __isset fields.
@@ -215,7 +255,7 @@ public class ReflectionStructObjectInspector extends
     for (int i = 0; i < fields.length; i++) {
       if (!shouldIgnoreField(fields[i].getName())) {
         structFieldObjectInspectors.add(ObjectInspectorFactory.getReflectionObjectInspector(fields[i]
-          .getGenericType(), options));
+          .getGenericType(), options, false));
       }
     }
     return structFieldObjectInspectors;

http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
index 600abbb..8593a41 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
@@ -34,6 +34,7 @@ import com.google.common.primitives.UnsignedBytes;
  * Always use the ObjectInspectorFactory to create new ObjectInspector objects,
  * instead of directly creating an instance of this class.
  */
+@SuppressWarnings("unchecked")
 public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector implements UnionObjectInspector {
 
   private static final String FIELD_METADATA_MAP = "metaDataMap";
@@ -73,8 +74,10 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
    * to allow recursive types.
    */
   @Override
-  protected void init(Class<?> objectClass,
+  protected void init(Type type, Class<?> objectClass,
                       ObjectInspectorFactory.ObjectInspectorOptions options) {
+    this.type = type;
+
     verifyObjectClassType(objectClass);
     this.objectClass = objectClass;
     final Field fieldMetaData;
@@ -89,15 +92,18 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
 
     try {
       final Map<? extends TFieldIdEnum, FieldMetaData> fieldMap = (Map<? extends TFieldIdEnum, FieldMetaData>) fieldMetaData.get(null);
-      fields = new ArrayList<StandardStructObjectInspector.MyField>(fieldMap.size());
-      this.ois = new ArrayList<ObjectInspector>();
-      for(Map.Entry<? extends TFieldIdEnum, FieldMetaData> metadata : fieldMap.entrySet()) {
-        int fieldId = metadata.getKey().getThriftFieldId();
-        String fieldName = metadata.getValue().fieldName;
-        final Type fieldType = ThriftObjectInspectorUtils.getFieldType(objectClass, fieldName);
-        final ObjectInspector reflectionObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector(fieldType, options);
-        fields.add(new StandardStructObjectInspector.MyField(fieldId, fieldName, reflectionObjectInspector));
-        this.ois.add(reflectionObjectInspector);
+      synchronized (this) {
+        fields = new ArrayList<StandardStructObjectInspector.MyField>(fieldMap.size());
+        this.ois = new ArrayList<ObjectInspector>();
+        for(Map.Entry<? extends TFieldIdEnum, FieldMetaData> metadata : fieldMap.entrySet()) {
+          int fieldId = metadata.getKey().getThriftFieldId();
+          String fieldName = metadata.getValue().fieldName;
+          final Type fieldType = ThriftObjectInspectorUtils.getFieldType(objectClass, fieldName);
+          final ObjectInspector reflectionObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector(fieldType, options, false);
+          fields.add(new StandardStructObjectInspector.MyField(fieldId, fieldName, reflectionObjectInspector));
+          this.ois.add(reflectionObjectInspector);
+        }
+        inited = true;
       }
     } catch (IllegalAccessException e) {
       throw new RuntimeException("Unable to find field metadata for thrift union field ", e);
@@ -110,7 +116,7 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
   }
 
   @Override
-  public List<? extends StructField> getAllStructFieldRefs() {
+  public synchronized List<? extends StructField> getAllStructFieldRefs() {
     return fields;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
index e2408c6..c14366a 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
@@ -17,15 +17,24 @@
  */
 package org.apache.hadoop.hive.serde2.objectinspector;
 
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 
-import junit.framework.TestCase;
-
+import org.apache.commons.lang.mutable.MutableObject;
+import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.thrift.test.Complex;
+
+import junit.framework.TestCase;
 
 /**
  * TestReflectionObjectInspectors.
@@ -100,4 +109,62 @@ public class TestReflectionObjectInspectors extends TestCase {
       throw e;
     }
   }
+
+  public void testObjectInspectorThreadSafety() throws InterruptedException {
+    final int workerCount = 5; // 5 workers to run getReflectionObjectInspector concurrently
+    final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(workerCount);
+    final MutableObject exception = new MutableObject();
+    Thread runner = new Thread(new Runnable() {
+      @Override
+      @SuppressWarnings("unchecked")
+      public void run() {
+        Future<ObjectInspector>[] results = (Future<ObjectInspector>[])new Future[workerCount];
+        ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>[] types =
+          (ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>[])new ObjectPair[] {
+             new ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>(Complex.class,
+               ObjectInspectorFactory.ObjectInspectorOptions.THRIFT),
+             new ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>(MyStruct.class,
+               ObjectInspectorFactory.ObjectInspectorOptions.JAVA),
+          };
+        try {
+          for (int i = 0; i < 20; i++) { // repeat 20 times
+            for (final ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions> t: types) {
+              ObjectInspectorFactory.objectInspectorCache.clear();
+              for (int k = 0; k < workerCount; k++) {
+                results[k] = executorService.schedule(new Callable<ObjectInspector>() {
+                  @Override
+                  public ObjectInspector call() throws Exception {
+                    return ObjectInspectorFactory.getReflectionObjectInspector(
+                      t.getFirst(), t.getSecond());
+                  }
+                }, 50, TimeUnit.MILLISECONDS);
+              }
+              ObjectInspector oi = results[0].get();
+              for (int k = 1; k < workerCount; k++) {
+                assertEquals(oi, results[k].get());
+              }
+            }
+          }
+        } catch (Throwable e) {
+          exception.setValue(e);
+        }
+      }
+    });
+    try {
+      runner.start();
+      long endTime = System.currentTimeMillis() + 300000; // timeout in 5 minutes
+      while (runner.isAlive()) {
+        if (System.currentTimeMillis() > endTime) {
+          runner.interrupt(); // Interrupt the runner thread
+          fail("Timed out waiting for the runner to finish");
+        }
+        runner.join(10000);
+      }
+      if (exception.getValue() != null) {
+        fail("Got exception: " + exception.getValue());
+      }
+    } finally {
+      executorService.shutdownNow();
+    }
+  }
 }