You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2015/01/28 05:20:55 UTC

svn commit: r1655216 - in /hive/branches/llap: ./ common/src/java/org/apache/hadoop/hive/conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ ql/src/java/org/apache/hadoop/hive/ql/io/

Author: gunther
Date: Wed Jan 28 04:20:55 2015
New Revision: 1655216

URL: http://svn.apache.org/r1655216
Log:
HIVE-9460: LLAP: Fix some static vars in the operator pipeline (Gunther Hagleitner)

Modified:
    hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/branches/llap/pom.xml
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
    hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java

Modified: hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/branches/llap/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Jan 28 04:20:55 2015
@@ -1883,6 +1883,10 @@ public class HiveConf extends Configurat
 
     HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet("mr", "tez", "spark"),
         "Chooses execution engine. Options are: mr (Map reduce, default), tez (hadoop 2 only), spark"),
+
+    HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"),
+        "Chooses whether query fragments will run in container or in llap"),
+
     HIVE_JAR_DIRECTORY("hive.jar.directory", null,
         "This is the location hive in tez mode will look for to find a site wide \n" +
         "installed hive instance."),

Modified: hive/branches/llap/pom.xml
URL: http://svn.apache.org/viewvc/hive/branches/llap/pom.xml?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/pom.xml (original)
+++ hive/branches/llap/pom.xml Wed Jan 28 04:20:55 2015
@@ -154,7 +154,7 @@
     <stax.version>1.0.1</stax.version>
     <slf4j.version>1.7.5</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
-    <tez.version>0.5.2</tez.version>
+    <tez.version>0.7.0-SNAPSHOT</tez.version>
     <super-csv.version>2.2.0</super-csv.version>
     <spark.version>1.2.0</spark.version>
     <scala.binary.version>2.10</scala.binary.version>

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Jan 28 04:20:55 2015
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
@@ -85,6 +86,8 @@ public class MapOperator extends Operato
   private final transient LongWritable recordCounter = new LongWritable();
   protected transient long numRows = 0;
   protected transient long cntr = 1;
+  private final Map<Integer, DummyStoreOperator> connectedOperators
+    = new TreeMap<Integer, DummyStoreOperator>();
 
   // input path --> {operator --> context}
   private final Map<String, Map<Operator<?>, MapOpCtx>> opCtxMap =
@@ -620,7 +623,7 @@ public class MapOperator extends Operato
 
   @Override
   public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
-    return MapRecordProcessor.getConnectOps();
+    return connectedOperators;
   }
 
   public void initializeContexts() {
@@ -634,4 +637,12 @@ public class MapOperator extends Operato
 
     return currentCtxs[0].deserializer;
   }
+
+  public void clearConnectedOperators() {
+    connectedOperators.clear();
+  }
+
+  public void setConnectedOperators(int tag, DummyStoreOperator dummyOp) {
+    connectedOperators.put(tag, dummyOp);
+  }
 }

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ObjectCacheFactory.java Wed Jan 28 04:20:55 2015
@@ -35,7 +35,8 @@ public class ObjectCacheFactory {
    * Returns the appropriate cache
    */
   public static ObjectCache getCache(Configuration conf) {
-    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez") &&
+	HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_MODE).equals("container")) {
       return new org.apache.hadoop.hive.ql.exec.tez.ObjectCache();
     } else {
       return new org.apache.hadoop.hive.ql.exec.mr.ObjectCache();

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Jan 28 04:20:55 2015
@@ -365,7 +365,7 @@ public final class Utilities {
       LOG.info("PLAN PATH = " + path);
       assert path != null;
       if (!gWorkMap.containsKey(path)
-        || HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
+        || !HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("mr")) {
         Path localPath;
         if (conf.getBoolean("mapreduce.task.uberized", false) && name.equals(REDUCE_PLAN_NAME)) {
           localPath = new Path(name);

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Wed Jan 28 04:20:55 2015
@@ -79,8 +79,6 @@ public class MapRecordProcessor extends
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
   private MapWork mapWork;
   List<MapWork> mergeWorkList = null;
-  private static Map<Integer, DummyStoreOperator> connectOps =
-      new TreeMap<Integer, DummyStoreOperator>();
 
   public MapRecordProcessor(JobConf jconf) throws Exception {
     ObjectCache cache = ObjectCacheFactory.getCache(jconf);
@@ -157,7 +155,7 @@ public class MapRecordProcessor extends
         mapOp = new MapOperator();
       }
 
-      connectOps.clear();
+      mapOp.clearConnectedOperators();
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
         for (MapWork mergeMapWork : mergeWorkList) {
@@ -176,7 +174,7 @@ public class MapRecordProcessor extends
             mergeMapOp.setChildren(jconf);
             if (foundCachedMergeWork == false) {
               DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
-              connectOps.put(mergeMapWork.getTag(), dummyOp);
+              mapOp.setConnectedOperators(mergeMapWork.getTag(), dummyOp);
             }
             mergeMapOp.setExecContext(new ExecMapperContext(jconf));
             mergeMapOp.initializeLocalWork(jconf);
@@ -338,10 +336,6 @@ public class MapRecordProcessor extends
     }
   }
 
-  public static Map<Integer, DummyStoreOperator> getConnectOps() {
-    return connectOps;
-  }
-
   private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
     // there should be only one MRInput
     MRInputLegacy theMRInput = null;

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Wed Jan 28 04:20:55 2015
@@ -68,7 +68,7 @@ public class ReduceRecordSource implemen
 
   private boolean abort = false;
 
-  private static Deserializer inputKeyDeserializer;
+  private Deserializer inputKeyDeserializer;
 
   // Input value serde needs to be an array to support different SerDe
   // for different tags

Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1655216&r1=1655215&r2=1655216&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Wed Jan 28 04:20:55 2015
@@ -36,16 +36,19 @@ import org.apache.hadoop.hive.ql.exec.Ut
  */
 public class IOContext {
 
-  /**
-   * Spark uses this thread local
-   */
-  private static final ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
-    @Override
-    protected synchronized IOContext initialValue() { return new IOContext(); }
- };
+    public static String DEFAULT_CONTEXT = "";
+
+    private static final ThreadLocal<Map<String,IOContext>> threadLocal = new ThreadLocal<Map<String,IOContext>>() {
+	@Override
+	protected synchronized Map<String,IOContext> initialValue() {
+		Map<String, IOContext> map = new HashMap<String, IOContext>(); 
+		map.put(DEFAULT_CONTEXT, new IOContext());
+		return map;
+	    }
+    };
 
   private static IOContext get() {
-    return IOContext.threadLocal.get();
+      return IOContext.threadLocal.get().get(DEFAULT_CONTEXT);
   }
 
   /**
@@ -53,12 +56,14 @@ public class IOContext {
    */
   private static final Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
 
-
   public static IOContext get(Configuration conf) {
-    if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
-      return get();
-    }
     String inputName = conf.get(Utilities.INPUT_NAME);
+    Map<String, IOContext> inputNameIOContextMap = threadLocal.get();
+
+    if (inputName == null) {
+	inputName = DEFAULT_CONTEXT;
+    }
+
     if (!inputNameIOContextMap.containsKey(inputName)) {
       IOContext ioContext = new IOContext();
       inputNameIOContextMap.put(inputName, ioContext);
@@ -68,8 +73,7 @@ public class IOContext {
   }
 
   public static void clear() {
-    IOContext.threadLocal.remove();
-    inputNameIOContextMap.clear();
+      threadLocal.remove();
   }
 
   private long currentBlockStart;