You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by jx...@apache.org on 2015/08/22 20:24:34 UTC
hive git commit: HIVE-11586:
ObjectInspectorFactory.getReflectionObjectInspector is not thread-safe (Jimmy,
reviewed by Szehon, Xuefu)
Repository: hive
Updated Branches:
refs/heads/master 3c1eae0c4 -> e2d148ba8
HIVE-11586: ObjectInspectorFactory.getReflectionObjectInspector is not thread-safe (Jimmy, reviewed by Szehon, Xuefu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e2d148ba
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e2d148ba
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e2d148ba
Branch: refs/heads/master
Commit: e2d148ba83a81fb03303bd3120b693759a2cd0ff
Parents: 3c1eae0
Author: Jimmy Xiang <jx...@cloudera.com>
Authored: Mon Aug 17 10:08:09 2015 -0700
Committer: Jimmy Xiang <jx...@cloudera.com>
Committed: Sat Aug 22 11:16:59 2015 -0700
----------------------------------------------------------------------
.../objectinspector/ObjectInspectorFactory.java | 61 +++++++++++++----
.../ReflectionStructObjectInspector.java | 60 ++++++++++++++---
.../ThriftUnionObjectInspector.java | 28 +++++---
.../TestReflectionObjectInspectors.java | 71 +++++++++++++++++++-
4 files changed, 185 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
index 97bb715..2b3fded 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ObjectInspectorFactory.java
@@ -23,6 +23,7 @@ import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -61,14 +62,38 @@ public final class ObjectInspectorFactory {
JAVA, THRIFT, PROTOCOL_BUFFERS, AVRO
};
- private static ConcurrentHashMap<Type, ObjectInspector> objectInspectorCache = new ConcurrentHashMap<Type, ObjectInspector>();
+ static ConcurrentHashMap<Type, ObjectInspector> objectInspectorCache = new ConcurrentHashMap<Type, ObjectInspector>();
public static ObjectInspector getReflectionObjectInspector(Type t,
ObjectInspectorOptions options) {
+ return getReflectionObjectInspector(t, options, true);
+ }
+
+ static ObjectInspector getReflectionObjectInspector(Type t,
+ ObjectInspectorOptions options, boolean ensureInited) {
ObjectInspector oi = objectInspectorCache.get(t);
if (oi == null) {
- oi = getReflectionObjectInspectorNoCache(t, options);
- objectInspectorCache.put(t, oi);
+ oi = getReflectionObjectInspectorNoCache(t, options, ensureInited);
+ ObjectInspector prev = objectInspectorCache.putIfAbsent(t, oi);
+ if (prev != null) {
+ oi = prev;
+ }
+ }
+ if (ensureInited && oi instanceof ReflectionStructObjectInspector) {
+ ReflectionStructObjectInspector soi = (ReflectionStructObjectInspector) oi;
+ synchronized (soi) {
+ HashSet<Type> checkedTypes = new HashSet<Type>();
+ while (!soi.isFullyInited(checkedTypes)) {
+ try {
+ // Wait for up to 3 seconds before checking if any init error.
+ // Init should be fast if no error, no need to make this configurable.
+ soi.wait(3000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Interrupted while waiting for "
+ + soi.getClass().getName() + " to initialize", e);
+ }
+ }
+ }
}
verifyObjectInspector(options, oi, ObjectInspectorOptions.JAVA, new Class[]{ThriftStructObjectInspector.class,
ProtocolBuffersStructObjectInspector.class});
@@ -88,10 +113,10 @@ public final class ObjectInspectorFactory {
* @param classes ObjectInspector should not be of these types
*/
private static void verifyObjectInspector(ObjectInspectorOptions option, ObjectInspector oi,
- ObjectInspectorOptions checkOption, Class[] classes) {
+ ObjectInspectorOptions checkOption, Class<?>[] classes) {
if (option.equals(checkOption)) {
- for (Class checkClass : classes) {
+ for (Class<?> checkClass : classes) {
if (oi.getClass().equals(checkClass)) {
throw new RuntimeException(
"Cannot call getObjectInspectorByReflection with more then one of " +
@@ -102,11 +127,11 @@ public final class ObjectInspectorFactory {
}
private static ObjectInspector getReflectionObjectInspectorNoCache(Type t,
- ObjectInspectorOptions options) {
+ ObjectInspectorOptions options, boolean ensureInited) {
if (t instanceof GenericArrayType) {
GenericArrayType at = (GenericArrayType) t;
return getStandardListObjectInspector(getReflectionObjectInspector(at
- .getGenericComponentType(), options));
+ .getGenericComponentType(), options, ensureInited));
}
if (t instanceof ParameterizedType) {
@@ -115,14 +140,14 @@ public final class ObjectInspectorFactory {
if (List.class.isAssignableFrom((Class<?>) pt.getRawType()) ||
Set.class.isAssignableFrom((Class<?>) pt.getRawType())) {
return getStandardListObjectInspector(getReflectionObjectInspector(pt
- .getActualTypeArguments()[0], options));
+ .getActualTypeArguments()[0], options, ensureInited));
}
// Map?
if (Map.class.isAssignableFrom((Class<?>) pt.getRawType())) {
return getStandardMapObjectInspector(getReflectionObjectInspector(pt
- .getActualTypeArguments()[0], options),
+ .getActualTypeArguments()[0], options, ensureInited),
getReflectionObjectInspector(pt.getActualTypeArguments()[1],
- options));
+ options, ensureInited));
}
// Otherwise convert t to RawType so we will fall into the following if
// block.
@@ -186,8 +211,20 @@ public final class ObjectInspectorFactory {
// put it into the cache BEFORE it is initialized to make sure we can catch
// recursive types.
- objectInspectorCache.put(t, oi);
- oi.init(c, options);
+ ReflectionStructObjectInspector prev =
+ (ReflectionStructObjectInspector) objectInspectorCache.putIfAbsent(t, oi);
+ if (prev != null) {
+ oi = prev;
+ } else {
+ try {
+ oi.init(t, c, options);
+ } finally {
+ if (!oi.inited) {
+ // Failed to init, remove it from cache
+ objectInspectorCache.remove(t, oi);
+ }
+ }
+ }
return oi;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
index 78e6066..22f8051 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ReflectionStructObjectInspector.java
@@ -19,9 +19,11 @@
package org.apache.hadoop.hive.serde2.objectinspector;
import java.lang.reflect.Field;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.util.ReflectionUtils;
@@ -81,6 +83,8 @@ public class ReflectionStructObjectInspector extends
Class<?> objectClass;
List<MyField> fields;
+ volatile boolean inited = false;
+ volatile Type type;
public Category getCategory() {
return Category.STRUCT;
@@ -113,12 +117,44 @@ public class ReflectionStructObjectInspector extends
}
/**
+ * Check if this inspector and all its field inspectors are initialized.
+ */
+ protected boolean isFullyInited(Set<Type> checkedTypes) {
+ if (type != null && // when type is not set, init hasn't been called yet
+ ObjectInspectorFactory.objectInspectorCache.get(type) != this) {
+ // This object should be the same as in cache, otherwise, it must be removed due to init error
+ throw new RuntimeException("Cached object inspector is gone while waiting for it to initialize");
+ }
+
+ if (!inited) {
+ return false;
+ }
+
+ // We don't want to check types already checked
+ checkedTypes.add(type);
+
+ // This inspector is initialized, we still need
+ // to check if all field inspectors are initialized
+ for (StructField field: getAllStructFieldRefs()) {
+ ObjectInspector oi = field.getFieldObjectInspector();
+ if (oi instanceof ReflectionStructObjectInspector) {
+ ReflectionStructObjectInspector soi = (ReflectionStructObjectInspector) oi;
+ if (!checkedTypes.contains(soi.type) && !soi.isFullyInited(checkedTypes)) {
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
* This method is only intended to be used by Utilities class in this package.
* The reason that this method is not recursive by itself is because we want
* to allow recursive types.
*/
- protected void init(Class<?> objectClass,
+ protected void init(Type type, Class<?> objectClass,
ObjectInspectorFactory.ObjectInspectorOptions options) {
+ this.type = type;
verifyObjectClassType(objectClass);
this.objectClass = objectClass;
@@ -126,16 +162,20 @@ public class ReflectionStructObjectInspector extends
Field[] reflectionFields = ObjectInspectorUtils
.getDeclaredNonStaticFields(objectClass);
- fields = new ArrayList<MyField>(structFieldObjectInspectors.size());
- int used = 0;
- for (int i = 0; i < reflectionFields.length; i++) {
- if (!shouldIgnoreField(reflectionFields[i].getName())) {
- reflectionFields[i].setAccessible(true);
- fields.add(new MyField(i, reflectionFields[i], structFieldObjectInspectors
- .get(used++)));
+ synchronized (this) {
+ fields = new ArrayList<MyField>(structFieldObjectInspectors.size());
+ int used = 0;
+ for (int i = 0; i < reflectionFields.length; i++) {
+ if (!shouldIgnoreField(reflectionFields[i].getName())) {
+ reflectionFields[i].setAccessible(true);
+ fields.add(new MyField(i, reflectionFields[i], structFieldObjectInspectors
+ .get(used++)));
+ }
}
+ assert (fields.size() == structFieldObjectInspectors.size());
+ inited = true;
+ notifyAll();
}
- assert (fields.size() == structFieldObjectInspectors.size());
}
// ThriftStructObjectInspector will override and ignore __isset fields.
@@ -215,7 +255,7 @@ public class ReflectionStructObjectInspector extends
for (int i = 0; i < fields.length; i++) {
if (!shouldIgnoreField(fields[i].getName())) {
structFieldObjectInspectors.add(ObjectInspectorFactory.getReflectionObjectInspector(fields[i]
- .getGenericType(), options));
+ .getGenericType(), options, false));
}
}
return structFieldObjectInspectors;
http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
index 600abbb..8593a41 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/objectinspector/ThriftUnionObjectInspector.java
@@ -34,6 +34,7 @@ import com.google.common.primitives.UnsignedBytes;
* Always use the ObjectInspectorFactory to create new ObjectInspector objects,
* instead of directly creating an instance of this class.
*/
+@SuppressWarnings("unchecked")
public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector implements UnionObjectInspector {
private static final String FIELD_METADATA_MAP = "metaDataMap";
@@ -73,8 +74,10 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
* to allow recursive types.
*/
@Override
- protected void init(Class<?> objectClass,
+ protected void init(Type type, Class<?> objectClass,
ObjectInspectorFactory.ObjectInspectorOptions options) {
+ this.type = type;
+
verifyObjectClassType(objectClass);
this.objectClass = objectClass;
final Field fieldMetaData;
@@ -89,15 +92,18 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
try {
final Map<? extends TFieldIdEnum, FieldMetaData> fieldMap = (Map<? extends TFieldIdEnum, FieldMetaData>) fieldMetaData.get(null);
- fields = new ArrayList<StandardStructObjectInspector.MyField>(fieldMap.size());
- this.ois = new ArrayList<ObjectInspector>();
- for(Map.Entry<? extends TFieldIdEnum, FieldMetaData> metadata : fieldMap.entrySet()) {
- int fieldId = metadata.getKey().getThriftFieldId();
- String fieldName = metadata.getValue().fieldName;
- final Type fieldType = ThriftObjectInspectorUtils.getFieldType(objectClass, fieldName);
- final ObjectInspector reflectionObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector(fieldType, options);
- fields.add(new StandardStructObjectInspector.MyField(fieldId, fieldName, reflectionObjectInspector));
- this.ois.add(reflectionObjectInspector);
+ synchronized (this) {
+ fields = new ArrayList<StandardStructObjectInspector.MyField>(fieldMap.size());
+ this.ois = new ArrayList<ObjectInspector>();
+ for(Map.Entry<? extends TFieldIdEnum, FieldMetaData> metadata : fieldMap.entrySet()) {
+ int fieldId = metadata.getKey().getThriftFieldId();
+ String fieldName = metadata.getValue().fieldName;
+ final Type fieldType = ThriftObjectInspectorUtils.getFieldType(objectClass, fieldName);
+ final ObjectInspector reflectionObjectInspector = ObjectInspectorFactory.getReflectionObjectInspector(fieldType, options, false);
+ fields.add(new StandardStructObjectInspector.MyField(fieldId, fieldName, reflectionObjectInspector));
+ this.ois.add(reflectionObjectInspector);
+ }
+ inited = true;
}
} catch (IllegalAccessException e) {
throw new RuntimeException("Unable to find field metadata for thrift union field ", e);
@@ -110,7 +116,7 @@ public class ThriftUnionObjectInspector extends ReflectionStructObjectInspector
}
@Override
- public List<? extends StructField> getAllStructFieldRefs() {
+ public synchronized List<? extends StructField> getAllStructFieldRefs() {
return fields;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/e2d148ba/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
----------------------------------------------------------------------
diff --git a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
index e2408c6..c14366a 100644
--- a/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
+++ b/serde/src/test/org/apache/hadoop/hive/serde2/objectinspector/TestReflectionObjectInspectors.java
@@ -17,15 +17,24 @@
*/
package org.apache.hadoop.hive.serde2.objectinspector;
+import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
-import junit.framework.TestCase;
-
+import org.apache.commons.lang.mutable.MutableObject;
+import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.thrift.test.Complex;
+
+import junit.framework.TestCase;
/**
* TestReflectionObjectInspectors.
@@ -100,4 +109,62 @@ public class TestReflectionObjectInspectors extends TestCase {
throw e;
}
}
+
+ public void testObjectInspectorThreadSafety() throws InterruptedException {
+ final int workerCount = 5; // 5 workers to run getReflectionObjectInspector concurrently
+ final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(workerCount);
+ final MutableObject exception = new MutableObject();
+ Thread runner = new Thread(new Runnable() {
+ @Override
+ @SuppressWarnings("unchecked")
+ public void run() {
+ Future<ObjectInspector>[] results = (Future<ObjectInspector>[])new Future[workerCount];
+ ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>[] types =
+ (ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>[])new ObjectPair[] {
+ new ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>(Complex.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.THRIFT),
+ new ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions>(MyStruct.class,
+ ObjectInspectorFactory.ObjectInspectorOptions.JAVA),
+ };
+ try {
+ for (int i = 0; i < 20; i++) { // repeat 20 times
+ for (final ObjectPair<Type, ObjectInspectorFactory.ObjectInspectorOptions> t: types) {
+ ObjectInspectorFactory.objectInspectorCache.clear();
+ for (int k = 0; k < workerCount; k++) {
+ results[k] = executorService.schedule(new Callable<ObjectInspector>() {
+ @Override
+ public ObjectInspector call() throws Exception {
+ return ObjectInspectorFactory.getReflectionObjectInspector(
+ t.getFirst(), t.getSecond());
+ }
+ }, 50, TimeUnit.MILLISECONDS);
+ }
+ ObjectInspector oi = results[0].get();
+ for (int k = 1; k < workerCount; k++) {
+ assertEquals(oi, results[k].get());
+ }
+ }
+ }
+ } catch (Throwable e) {
+ exception.setValue(e);
+ }
+ }
+ });
+ try {
+ runner.start();
+ long endTime = System.currentTimeMillis() + 300000; // timeout in 5 minutes
+ while (runner.isAlive()) {
+ if (System.currentTimeMillis() > endTime) {
+ runner.interrupt(); // Interrupt the runner thread
+ fail("Timed out waiting for the runner to finish");
+ }
+ runner.join(10000);
+ }
+ if (exception.getValue() != null) {
+ fail("Got exception: " + exception.getValue());
+ }
+ } finally {
+ executorService.shutdownNow();
+ }
+ }
}