You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2015/01/28 05:20:55 UTC
svn commit: r1655216 - in /hive/branches/llap: ./
common/src/java/org/apache/hadoop/hive/conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/io/
Author: gunther
Date: Wed Jan 28 04:20:55 2015
New Revision: 1655216
URL: http://svn.apache.org/r1655216
Log:
HIVE-9460: LLAP: Fix some static vars in the operator pipeline (Gunther Hagleitner)
Modified:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/branches/llap/pom.xml
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 28 04:20:55 2015
@@ -1883,6 +1883,10 @@ public class HiveConf extends Configurat
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez", "spark"),
"Chooses execution engine. Options are: mr (Map reduce, default), tez (hadoop 2 only), spark"),
+
+ HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"),
+ "Chooses whether query fragments will run in container or in llap"),
+
HIVE_JAR_DIRECTORY("hive.jar.directory", null,
"This is the location hive in tez mode will look for to find a site wide \n" +
"installed hive instance."),
Modified: hive/branches/llap/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/pom.xml?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/pom.xml (original)
+++ hive/branches/llap/pom.xml Wed Jan 28 04:20:55 2015
@@ -154,7 +154,7 @@
<stax.version>1.0.1</stax.version>
<slf4j.version>1.7.5</slf4j.version>
<ST4.version>4.0.4</ST4.version>
- <tez.version>0.5.2</tez.version>
+ <tez.version>0.7.0-SNAPSHOT</tez.version>
<super-csv.version>2.2.0</super-csv.version>
<spark.version>1.2.0</spark.version>
<scala.binary.version>2.10</scala.binary.version>
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Jan 28 04:20:55 2015
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
@@ -85,6 +86,8 @@ public class MapOperator extends Operato
private final transient LongWritable recordCounter = new LongWritable();
protected transient long numRows = 0;
protected transient long cntr = 1;
+ private final Map<Integer, DummyStoreOperator> connectedOperators
+ = new TreeMap<Integer, DummyStoreOperator>();
// input path --> {operator --> context}
private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
@@ -620,7 +623,7 @@ public class MapOperator extends Operato
@Override
public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
- return MapRecordProcessor.getConnectOps();
+ return connectedOperators;
}
public void initializeContexts() {
@@ -634,4 +637,12 @@ public class MapOperator extends Operato
return currentCtxs[0].deserializer;
}
+
+ public void clearConnectedOperators() {
+ connectedOperators.clear();
+ }
+
+ public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) {
+ connectedOperators.put(tag, dummyOp);
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java Wed Jan 28 04:20:55 2015
@@ -35,7 +35,8 @@ public class ObjectCacheFactory {
* Returns the appropriate cache
*/
public static ObjectCache getCache(Configuration conf) {
- if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") &&
+ HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("container")) {
return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
} else {
return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Jan 28 04:20:55 2015
@@ -365,7 +365,7 @@ public final class Utilities {
LOG.info("PLAN PATH = " + path);
assert path != null;
if (!gWorkMap.containsKey(path)
- || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+ || !HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) {
Path localPath;
if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
localPath = new Path(name);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Jan 28 04:20:55 2015
@@ -79,8 +79,6 @@ public class MapRecordProcessor extends
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
List<MapWork> mergeWorkList = null;
- private static Map<Integer, DummyStoreOperator> connectOps =
- new TreeMap<Integer, DummyStoreOperator>();
public MapRecordProcessor(JobConf jconf) throws Exception {
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -157,7 +155,7 @@ public class MapRecordProcessor extends
mapOp = new MapOperator();
}
- connectOps.clear();
+ mapOp.clearConnectedOperators();
if (mergeWorkList != null) {
MapOperator mergeMapOp = null;
for (MapWork mergeMapWork : mergeWorkList) {
@@ -176,7 +174,7 @@ public class MapRecordProcessor extends
mergeMapOp.setChildren(jconf);
if (foundCachedMergeWork == false) {
DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
- connectOps.put(mergeMapWork.getTag(), dummyOp);
+ mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
}
mergeMapOp.setExecContext(new ExecMapperContext(jconf));
mergeMapOp.initializeLocalWork(jconf);
@@ -338,10 +336,6 @@ public class MapRecordProcessor extends
}
}
- public static Map<Integer, DummyStoreOperator> getConnectOps() {
- return connectOps;
- }
-
private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
// there should be only one MRInput
MRInputLegacy theMRInput = null;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Jan 28 04:20:55 2015
@@ -68,7 +68,7 @@ public class ReduceRecordSource implemen
private boolean abort = false;
- private static Deserializer inputKeyDeserializer;
+ private Deserializer inputKeyDeserializer;
// Input value serde needs to be an array to support different SerDe
// for different tags
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Jan 28 04:20:55 2015
@@ -36,16 +36,19 @@ import org.apache.hadoop.hive.ql.exec.Ut
*/
public class IOContext {
- /**
- * Spark uses this thread local
- */
- private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
- @Override
- protected synchronized IOContext initialValue() { return new IOContext(); }
- };
+ public static String DEFAULT_CONTEXT = "";
+
+ private static final ThreadLocal<Map<String,IOContext>> threadLocal = new ThreadLocal<Map<String,IOContext>>() {
+ @Override
+ protected synchronized Map<String,IOContext> initialValue() {
+ Map<String, IOContext> map = new HashMap<String, IOContext>();
+ map.put(DEFAULT_CONTEXT, new IOContext());
+ return map;
+ }
+ };
private static IOContext get() {
- return IOContext.threadLocal.get();
+ return IOContext.threadLocal.get().get(DEFAULT_CONTEXT);
}
/**
@@ -53,12 +56,14 @@ public class IOContext {
*/
private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
-
public static IOContext get(Configuration conf) {
- if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
- return get();
- }
String inputName = conf.get(Utilities.INPUT_NAME);
+ Map<String, IOContext> inputNameIOContextMap = threadLocal.get();
+
+ if (inputName == null) {
+ inputName = DEFAULT_CONTEXT;
+ }
+
if (!inputNameIOContextMap.containsKey(inputName)) {
IOContext ioContext = new IOContext();
inputNameIOContextMap.put(inputName, ioContext);
@@ -68,8 +73,7 @@ public class IOContext {
}
public static void clear() {
- IOContext.threadLocal.remove();
- inputNameIOContextMap.clear();
+ threadLocal.remove();
}
private long currentBlockStart;