You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/09/14 08:58:50 UTC

svn commit: r1170453 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java

Author: nzhang
Date: Wed Sep 14 06:58:49 2011
New Revision: 1170453

URL: http://svn.apache.org/viewvc?rev=1170453&view=rev
Log:
HIVE-2440. make hive mapper initialize faster when having tons of input files (Yongqiang He via Ning Zhang)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1170453&r1=1170452&r2=1170453&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Sep 14 06:58:49 2011
@@ -86,7 +86,8 @@ public class MapOperator extends Operato
 
   private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
 
-  private final java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
+  private final Map<Operator<? extends Serializable>, MapOpCtx> childrenOpToOpCtxMap = 
+    new HashMap<Operator<? extends Serializable>, MapOpCtx>();
 
   private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
 
@@ -123,7 +124,10 @@ public class MapOperator extends Operato
 
     @Override
     public int hashCode() {
-      return (op == null) ? 0 : op.hashCode();
+      int ret = (path == null) ? 0 : path.hashCode();
+      ret += (alias == null) ? 0 : alias.hashCode();
+      ret += (op == null) ? 0 : op.hashCode();
+      return ret;
     }
 
     public Operator<? extends Serializable> getOp() {
@@ -358,7 +362,6 @@ public class MapOperator extends Operato
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
 
     try {
-      boolean done = false;
       for (String onefile : conf.getPathToAliases().keySet()) {
         MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile);
         Path onepath = new Path(new Path(onefile).toUri().getPath());
@@ -381,12 +384,11 @@ public class MapOperator extends Operato
           // Operator
           if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
             children.add(op);
-            childrenPaths.add(onefile);
+            childrenOpToOpCtxMap.put(op, opCtx);
             LOG.info("dump " + op.getName() + " "
                 + opCtxMap.get(inp).getRowObjectInspector().getTypeName());
           }
           setInspectorInput(inp);
-          done = true;
         }
       }
       
@@ -410,8 +412,20 @@ public class MapOperator extends Operato
     // set that parent initialization is done and call initialize on children
     state = State.INIT;
     List<Operator<? extends Serializable>> children = getChildOperators();
-    Path fpath = new Path((new Path(HiveConf.getVar(hconf,
-        HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
+    
+    for (Entry<Operator<? extends Serializable>, MapOpCtx> entry : childrenOpToOpCtxMap
+        .entrySet()) {
+      Operator<? extends Serializable> child = entry.getKey();
+      MapOpCtx mapOpCtx = entry.getValue();
+      // Add alias, table name, and partitions to hadoop conf so that their
+      // children will
+      // inherit these
+      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME,
+          mapOpCtx.tableName);
+      HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME,
+          mapOpCtx.partName);
+      child.initialize(hconf, new ObjectInspector[] {mapOpCtx.getRowObjectInspector()});
+    }
 
     for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
       // Add alias, table name, and partitions to hadoop conf so that their
@@ -430,20 +444,6 @@ public class MapOperator extends Operato
           extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
         }
         extraChildrenToClose.add(op);
-      }
-
-      // multiple input paths may corresponding the same operator (tree). The
-      // below logic is to avoid initialize one operator multiple times if there
-      // is one input path in this mapper's input paths.
-      boolean shouldInit = true;
-      List<String> paths = operatorToPaths.get(op);
-      for (String path : paths) {
-        if (childrenPaths.contains(path) && !path.equals(input.path)) {
-          shouldInit = false;
-          break;
-        }
-      }
-      if (shouldInit) {
         op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()});
       }
     }