You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by nz...@apache.org on 2011/09/14 08:58:50 UTC
svn commit: r1170453 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
Author: nzhang
Date: Wed Sep 14 06:58:49 2011
New Revision: 1170453
URL: http://svn.apache.org/viewvc?rev=1170453&view=rev
Log:
HIVE-2440. make hive mapper initialize faster when having tons of input files (Yongqiang He via Ning Zhang)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1170453&r1=1170452&r2=1170453&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Wed Sep 14 06:58:49 2011
@@ -86,7 +86,8 @@ public class MapOperator extends Operato
private Map<Operator<? extends Serializable>, java.util.ArrayList<String>> operatorToPaths;
- private final java.util.ArrayList<String> childrenPaths = new ArrayList<String>();
+ private final Map<Operator<? extends Serializable>, MapOpCtx> childrenOpToOpCtxMap =
+ new HashMap<Operator<? extends Serializable>, MapOpCtx>();
private ArrayList<Operator<? extends Serializable>> extraChildrenToClose = null;
@@ -123,7 +124,10 @@ public class MapOperator extends Operato
@Override
public int hashCode() {
- return (op == null) ? 0 : op.hashCode();
+ int ret = (path == null) ? 0 : path.hashCode();
+ ret += (alias == null) ? 0 : alias.hashCode();
+ ret += (op == null) ? 0 : op.hashCode();
+ return ret;
}
public Operator<? extends Serializable> getOp() {
@@ -358,7 +362,6 @@ public class MapOperator extends Operato
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
try {
- boolean done = false;
for (String onefile : conf.getPathToAliases().keySet()) {
MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile);
Path onepath = new Path(new Path(onefile).toUri().getPath());
@@ -381,12 +384,11 @@ public class MapOperator extends Operato
// Operator
if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
children.add(op);
- childrenPaths.add(onefile);
+ childrenOpToOpCtxMap.put(op, opCtx);
LOG.info("dump " + op.getName() + " "
+ opCtxMap.get(inp).getRowObjectInspector().getTypeName());
}
setInspectorInput(inp);
- done = true;
}
}
@@ -410,8 +412,20 @@ public class MapOperator extends Operato
// set that parent initialization is done and call initialize on children
state = State.INIT;
List<Operator<? extends Serializable>> children = getChildOperators();
- Path fpath = new Path((new Path(HiveConf.getVar(hconf,
- HiveConf.ConfVars.HADOOPMAPFILENAME))).toUri().getPath());
+
+ for (Entry<Operator<? extends Serializable>, MapOpCtx> entry : childrenOpToOpCtxMap
+ .entrySet()) {
+ Operator<? extends Serializable> child = entry.getKey();
+ MapOpCtx mapOpCtx = entry.getValue();
+ // Add alias, table name, and partitions to hadoop conf so that their
+ // children will
+ // inherit these
+ HiveConf.setVar(hconf, HiveConf.ConfVars.HIVETABLENAME,
+ mapOpCtx.tableName);
+ HiveConf.setVar(hconf, HiveConf.ConfVars.HIVEPARTITIONNAME,
+ mapOpCtx.partName);
+ child.initialize(hconf, new ObjectInspector[] {mapOpCtx.getRowObjectInspector()});
+ }
for (Entry<MapInputPath, MapOpCtx> entry : opCtxMap.entrySet()) {
// Add alias, table name, and partitions to hadoop conf so that their
@@ -430,20 +444,6 @@ public class MapOperator extends Operato
extraChildrenToClose = new ArrayList<Operator<? extends Serializable>>();
}
extraChildrenToClose.add(op);
- }
-
- // multiple input paths may corresponding the same operator (tree). The
- // below logic is to avoid initialize one operator multiple times if there
- // is one input path in this mapper's input paths.
- boolean shouldInit = true;
- List<String> paths = operatorToPaths.get(op);
- for (String path : paths) {
- if (childrenPaths.contains(path) && !path.equals(input.path)) {
- shouldInit = false;
- break;
- }
- }
- if (shouldInit) {
op.initialize(hconf, new ObjectInspector[] {entry.getValue().getRowObjectInspector()});
}
}