You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/08 05:32:02 UTC
svn commit: r1530140 - in /hive/branches/tez: eclipse-templates/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/
ql/src/java/org/apache/hadoop/hive/ql/o...
Author: gunther
Date: Tue Oct 8 03:32:00 2013
New Revision: 1530140
URL: http://svn.apache.org/r1530140
Log:
HIVE-5270: Enable hash joins using tez (Gunther Hagleitner)
Modified:
hive/branches/tez/eclipse-templates/.classpath
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
Modified: hive/branches/tez/eclipse-templates/.classpath
URL: http://svn.apache.org/viewvc/hive/branches/tez/eclipse-templates/.classpath?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/eclipse-templates/.classpath (original)
+++ hive/branches/tez/eclipse-templates/.classpath Tue Oct 8 03:32:00 2013
@@ -50,6 +50,7 @@
<classpathentry kind="lib" path="build/ivy/lib/default/jline-@jline.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/json-@json.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/commons-compress-@commons-compress.version@.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/default/commons-codec-@commons-codec.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/commons-lang-@commons-lang.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/commons-logging-@commons-logging.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/commons-logging-api-@commons-logging-api.version@.jar"/>
@@ -93,15 +94,15 @@
<classpathentry kind="lib" path="build/ivy/lib/default/tempus-fugit-@tempus-fugit.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/tez-common-@tez.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/tez-dag-@tez.version@.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/default/tez-dag-api-@tez.version@.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/default/tez-engine-@tez.version@.jar"/>
- <classpathentry kind="lib" path="build/ivy/lib/default/tez-engine-api-@tez.version@.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/default/tez-api-@tez.version@.jar"/>
+ <classpathentry kind="lib" path="build/ivy/lib/default/tez-runtime-library-@tez.version@.jar"/>
<classpathentry kind="lib" path="build/ivy/lib/default/tez-mapreduce-@tez.version@.jar"/>
<classpathentry kind="src" path="build/contrib/test/src"/>
<classpathentry kind="src" path="build/metastore/gen/antlr/gen-java"/>
<classpathentry kind="lib" path="build/testutils/hive-testutils-@HIVE_VERSION@.jar"/>
<classpathentry kind="src" path="build/ql/test/src"/>
<classpathentry kind="src" path="build/ql/gen/antlr/gen-java"/>
+ <classpathentry kind="src" path="build/ql/gen/vector"/>
<classpathentry kind="src" path="beeline/src/java"/>
<classpathentry kind="src" path="beeline/src/test"/>
<classpathentry kind="src" path="cli/src/java"/>
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Tue Oct 8 03:32:00 2013
@@ -68,6 +68,20 @@ public class MapJoinOperator extends Abs
super(mjop);
}
+ /*
+ * We need the base (operator.java) implementation of start/endGroup.
+ * The parent class has functionality in those that map join can't use.
+ */
+ @Override
+ public void endGroup() throws HiveException {
+ defaultEndGroup();
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ defaultStartGroup();
+ }
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
super.initializeOp(hconf);
@@ -126,7 +140,8 @@ public class MapJoinOperator extends Abs
private void loadHashTable() throws HiveException {
- if (!this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+ if (this.getExecContext().getLocalWork() == null
+ || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
if (hashTblInitedOnce) {
return;
} else {
@@ -159,8 +174,8 @@ public class MapJoinOperator extends Abs
public void processOp(Object row, int tag) throws HiveException {
try {
if (firstRow) {
- // generate the map metadata
generateMapMetaData();
+ loadHashTable();
firstRow = false;
}
alias = (byte)tag;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 8 03:32:00 2013
@@ -515,8 +515,7 @@ public abstract class Operator<T extends
}
}
- // If a operator wants to do some work at the beginning of a group
- public void startGroup() throws HiveException {
+ protected final void defaultStartGroup() throws HiveException {
LOG.debug("Starting group");
if (childOperators == null) {
@@ -535,8 +534,7 @@ public abstract class Operator<T extends
LOG.debug("Start group Done");
}
- // If an operator wants to do some work at the end of a group
- public void endGroup() throws HiveException {
+ protected final void defaultEndGroup() throws HiveException {
LOG.debug("Ending group");
if (childOperators == null) {
@@ -555,6 +553,16 @@ public abstract class Operator<T extends
LOG.debug("End group Done");
}
+ // If a operator wants to do some work at the beginning of a group
+ public void startGroup() throws HiveException {
+ defaultStartGroup();
+ }
+
+ // If an operator wants to do some work at the end of a group
+ public void endGroup() throws HiveException {
+ defaultEndGroup();
+ }
+
// an blocking operator (e.g. GroupByOperator and JoinOperator) can
// override this method to forward its outputs
public void flush() throws HiveException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinKey.java Tue Oct 8 03:32:00 2013
@@ -79,21 +79,27 @@ public class MapJoinKey {
return false;
return true;
}
- @SuppressWarnings("unchecked")
- public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
throws IOException, SerDeException {
- SerDe serde = context.getSerDe();
container.readFields(in);
+ read(context, container);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void read(MapJoinObjectSerDeContext context, Writable container) throws SerDeException {
+ SerDe serde = context.getSerDe();
List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+
if(value == null) {
key = EMPTY_OBJECT_ARRAY;
} else {
key = value.toArray();
}
}
-
- public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+
+ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
throws IOException, SerDeException {
SerDe serde = context.getSerDe();
ObjectInspector objectInspector = context.getStandardOI();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinRowContainer.java Tue Oct 8 03:32:00 2013
@@ -104,30 +104,34 @@ public class MapJoinRowContainer extends
}
return result;
}
-
- @SuppressWarnings({"unchecked"})
- public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
+
+ public void read(MapJoinObjectSerDeContext context, ObjectInputStream in, Writable container)
throws IOException, SerDeException {
clear();
- SerDe serde = context.getSerDe();
long numRows = in.readLong();
for (long rowIndex = 0L; rowIndex < numRows; rowIndex++) {
- container.readFields(in);
- List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(container),
- serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
- if(value == null) {
- add(toList(EMPTY_OBJECT_ARRAY));
- } else {
- Object[] valuesArray = value.toArray();
- if (context.hasFilterTag()) {
- aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
- }
- add(toList(valuesArray));
+ container.readFields(in);
+ read(context, container);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void read(MapJoinObjectSerDeContext context, Writable currentValue) throws SerDeException {
+ SerDe serde = context.getSerDe();
+ List<Object> value = (List<Object>)ObjectInspectorUtils.copyToStandardObject(serde.deserialize(currentValue),
+ serde.getObjectInspector(), ObjectInspectorCopyOption.WRITABLE);
+ if(value == null) {
+ add(toList(EMPTY_OBJECT_ARRAY));
+ } else {
+ Object[] valuesArray = value.toArray();
+ if (context.hasFilterTag()) {
+ aliasFilter &= ((ShortWritable)valuesArray[valuesArray.length - 1]).get();
}
+ add(toList(valuesArray));
}
}
-
- public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
+
+ public void write(MapJoinObjectSerDeContext context, ObjectOutputStream out)
throws IOException, SerDeException {
SerDe serde = context.getSerDe();
ObjectInspector valueObjectInspector = context.getStandardOI();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinTableContainerSerDe.java Tue Oct 8 03:32:00 2013
@@ -40,6 +40,14 @@ public class MapJoinTableContainerSerDe
this.keyContext = keyContext;
this.valueContext = valueContext;
}
+
+ public MapJoinObjectSerDeContext getKeyContext() {
+ return keyContext;
+ }
+ public MapJoinObjectSerDeContext getValueContext() {
+ return valueContext;
+ }
+
@SuppressWarnings({"unchecked"})
public MapJoinTableContainer load(ObjectInputStream in)
throws HiveException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HashTableLoader.java Tue Oct 8 03:32:00 2013
@@ -17,18 +17,27 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.persistence.HashMapWrapper;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinKey;
+import org.apache.hadoop.hive.ql.exec.persistence.MapJoinRowContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainer;
import org.apache.hadoop.hive.ql.exec.persistence.MapJoinTableContainerSerDe;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.io.Writable;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
/**
* HashTableLoader for Tez constructs the hashtable from records read from
@@ -41,7 +50,6 @@ public class HashTableLoader implements
public HashTableLoader() {
}
- @SuppressWarnings("unused")
@Override
public void load(ExecMapperContext context,
Configuration hconf,
@@ -49,8 +57,43 @@ public class HashTableLoader implements
byte posBigTable,
MapJoinTableContainer[] mapJoinTables,
MapJoinTableContainerSerDe[] mapJoinTableSerdes) throws HiveException {
+
TezContext tezContext = (TezContext) MapredContext.get();
Map<Integer, String> parentToInput = desc.getParentToInput();
+ int hashTableThreshold = HiveConf.getIntVar(hconf, HiveConf.ConfVars.HIVEHASHTABLETHRESHOLD);
+ float hashTableLoadFactor = HiveConf.getFloatVar(hconf,
+ HiveConf.ConfVars.HIVEHASHTABLELOADFACTOR);
+
+ for (int pos = 0; pos < mapJoinTables.length; pos++) {
+ if (pos == posBigTable) {
+ continue;
+ }
+
+ LogicalInput input = tezContext.getInput(parentToInput.get(pos));
+
+ try {
+ KeyValueReader kvReader = (KeyValueReader) input.getReader();
+
+ MapJoinTableContainer tableContainer = new HashMapWrapper(hashTableThreshold,
+ hashTableLoadFactor);
+
+ // simply read all the kv pairs into the hashtable.
+ while (kvReader.next()) {
+ MapJoinKey key = new MapJoinKey();
+ key.read(mapJoinTableSerdes[pos].getKeyContext(), (Writable)kvReader.getCurrentKey());
+ MapJoinRowContainer values = new MapJoinRowContainer();
+ values.read(mapJoinTableSerdes[pos].getValueContext(), (Writable)kvReader.getCurrentValue());
+ tableContainer.put(key, values);
+ }
+
+ mapJoinTables[pos] = tableContainer;
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } catch (SerDeException e) {
+ throw new HiveException(e);
+ } catch (Exception e) {
+ throw new HiveException(e);
+ }
+ }
}
-
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Oct 8 03:32:00 2013
@@ -18,20 +18,24 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
@@ -53,6 +57,7 @@ public class MapRecordProcessor extends
private final ExecMapperContext execContext = new ExecMapperContext();
private boolean abort = false;
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
+ private MapWork mapWork;
@Override
void init(JobConf jconf, MRTaskReporter mrReporter, Map<String, LogicalInput> inputs,
@@ -73,15 +78,15 @@ public class MapRecordProcessor extends
execContext.setJc(jconf);
// create map and fetch operators
- MapWork mrwork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
- if (mrwork == null) {
- mrwork = Utilities.getMapWork(jconf);
- cache.cache(MAP_PLAN_KEY, mrwork);
+ mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
+ if (mapWork == null) {
+ mapWork = Utilities.getMapWork(jconf);
+ cache.cache(MAP_PLAN_KEY, mapWork);
}
mapOp = new MapOperator();
// initialize map operator
- mapOp.setConf(mrwork);
+ mapOp.setConf(mapWork);
mapOp.setChildren(jconf);
l4j.info(mapOp.dump(0));
@@ -91,6 +96,17 @@ public class MapRecordProcessor extends
mapOp.initializeLocalWork(jconf);
mapOp.initialize(jconf, null);
+ // Initialization isn't finished until all parents of all operators
+ // are initialized. For broadcast joins that means initializing the
+ // dummy parent operators as well.
+ List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jconf, null);
+ }
+ }
+
mapOp.setOutputCollector(out);
mapOp.setReporter(reporter);
MapredContext.get().setReporter(reporter);
@@ -124,10 +140,6 @@ public class MapRecordProcessor extends
@Override
void run() throws IOException{
- if (inputs.size() != 1) {
- throw new IllegalArgumentException("MapRecordProcessor expects single input"
- + ", inputCount=" + inputs.size());
- }
MRInput in = getMRInput(inputs);
KeyValueReader reader = in.getReader();
@@ -186,6 +198,17 @@ public class MapRecordProcessor extends
// detecting failed executions by exceptions thrown by the operator tree
try {
mapOp.close(abort);
+
+ // Need to close the dummyOps as well. The operator pipeline
+ // is not considered "closed/done" unless all operators are
+ // done. For broadcast joins that includes the dummy parents.
+ List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.close(abort);
+ }
+ }
+
if (isLogInfoEnabled) {
logCloseInfo();
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Oct 8 03:32:00 2013
@@ -25,6 +25,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -77,6 +79,8 @@ public class ReduceRecordProcessor exte
private Object keyObject = null;
private BytesWritable groupKey;
+ private ReduceWork redWork;
+
List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
@Override
@@ -90,7 +94,7 @@ public class ReduceRecordProcessor exte
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
- ReduceWork redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
+ redWork = (ReduceWork) cache.retrieve(REDUCE_PLAN_KEY);
if (redWork == null) {
redWork = Utilities.getReduceWork(jconf);
cache.cache(REDUCE_PLAN_KEY, redWork);
@@ -134,6 +138,18 @@ public class ReduceRecordProcessor exte
try {
l4j.info(reducer.dump(0));
reducer.initialize(jconf, rowObjectInspector);
+
+ // Initialization isn't finished until all parents of all operators
+ // are initialized. For broadcast joins that means initializing the
+ // dummy parent operators as well.
+ List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jconf, null);
+ }
+ }
+
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -153,10 +169,6 @@ public class ReduceRecordProcessor exte
@Override
void run() throws IOException{
- if (inputs.size() != 1) {
- throw new IllegalArgumentException("ReduceRecordProcessor expects single input"
- + ", inputCount=" + inputs.size());
- }
//TODO - changes this for joins
ShuffledMergedInput in = (ShuffledMergedInput)inputs.values().iterator().next();
@@ -299,6 +311,16 @@ public class ReduceRecordProcessor exte
}
reducer.close(abort);
+
+ // Need to close the dummyOps as well. The operator pipeline
+ // is not considered "closed/done" unless all operators are
+ // done. For broadcast joins that includes the dummy parents.
+ List<HashTableDummyOperator> dummyOps = redWork.getDummyOps();
+ if (dummyOps != null) {
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.close(abort);
+ }
+ }
reportStats rps = new reportStats(reporter);
reducer.preorderMap(rps);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Tue Oct 8 03:32:00 2013
@@ -527,6 +527,7 @@ abstract public class AbstractSMBJoinPro
SortBucketJoinProcCtx joinContext,
ParseContext parseContext) throws SemanticException {
MapJoinOperator mapJoinOp = MapJoinProcessor.convertMapJoin(
+ parseContext.getConf(),
parseContext.getOpParseCtx(),
joinOp,
pGraphContext.getJoinContext().get(joinOp),
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Tue Oct 8 03:32:00 2013
@@ -95,7 +95,7 @@ public class ConvertJoinMapJoin implemen
long inputSize = currInputStat.getNumberOfBytes();
if ((bigInputStat == null) ||
- ((bigInputStat != null) &&
+ ((bigInputStat != null) &&
(inputSize > bigInputStat.getNumberOfBytes()))) {
if (bigTableFound) {
@@ -141,7 +141,7 @@ public class ConvertJoinMapJoin implemen
}
if (bigTablePosition == -1) {
- // all tables have size 0. We let the suffle join handle this case.
+ // all tables have size 0. We let the shuffle join handle this case.
return null;
}
@@ -161,7 +161,7 @@ public class ConvertJoinMapJoin implemen
// convert to a map join operator with this information
ParseContext parseContext = context.parseContext;
MapJoinOperator mapJoinOp = MapJoinProcessor.
- convertJoinOpMapJoinOp(parseContext.getOpParseCtx(),
+ convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true, false);
Operator<? extends OperatorDesc> parentBigTableOp
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Tue Oct 8 03:32:00 2013
@@ -237,13 +237,14 @@ public class MapJoinProcessor implements
* @return the alias to the big table
* @throws SemanticException
*/
- public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
+ public static String genMapJoinOpAndLocalWork(HiveConf conf, MapredWork newWork,
+ JoinOperator op, int mapJoinPos)
throws SemanticException {
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
newWork.getMapWork().getOpParseCtxMap();
QBJoinTree newJoinTree = newWork.getMapWork().getJoinTree();
// generate the map join operator; already checked the map join
- MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+ MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(conf, opParseCtxMap, op,
newJoinTree, mapJoinPos, true, false);
return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
}
@@ -316,7 +317,7 @@ public class MapJoinProcessor implements
* are cached in memory
* @param noCheckOuterJoin
*/
- public static MapJoinOperator convertMapJoin(
+ public static MapJoinOperator convertMapJoin(HiveConf conf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
boolean validateMapJoinTree)
@@ -374,7 +375,7 @@ public class MapJoinProcessor implements
}
// create the map-join operator
- MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(opParseCtxMap,
+ MapJoinOperator mapJoinOp = convertJoinOpMapJoinOp(conf, opParseCtxMap,
op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree);
@@ -395,7 +396,7 @@ public class MapJoinProcessor implements
return mapJoinOp;
}
- public static MapJoinOperator convertJoinOpMapJoinOp(
+ public static MapJoinOperator convertJoinOpMapJoinOp(HiveConf hconf,
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin,
boolean validateMapJoinTree)
@@ -433,9 +434,6 @@ public class MapJoinProcessor implements
if (src != null) {
Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
assert parentOp.getParentOperators().size() == 1;
- Operator<? extends OperatorDesc> grandParentOp =
- parentOp.getParentOperators().get(0);
-
oldReduceSinkParentOps.add(parentOp);
}
pos++;
@@ -536,8 +534,8 @@ public class MapJoinProcessor implements
}
List<String> outputColumnNames = op.getConf().getOutputColumnNames();
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
- .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+ PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
JoinCondDesc[] joinCondns = op.getConf().getConds();
MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
valueTableDescs, valueFiltedTableDescs, outputColumnNames, mapJoinPos, joinCondns,
@@ -589,14 +587,14 @@ public class MapJoinProcessor implements
* are cached in memory
* @param noCheckOuterJoin
*/
- public static MapJoinOperator convertSMBJoinToMapJoin(
+ public static MapJoinOperator convertSMBJoinToMapJoin(HiveConf hconf,
Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
throws SemanticException {
// Create a new map join operator
SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
- TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+ TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf, PlanUtils
.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
keyTableDesc, smbJoinDesc.getExprs(),
@@ -644,8 +642,8 @@ public class MapJoinProcessor implements
LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap = pctx
.getOpParseCtx();
- MapJoinOperator mapJoinOp = convertMapJoin(opParseCtxMap, op, joinTree, mapJoinPos,
- noCheckOuterJoin, true);
+ MapJoinOperator mapJoinOp = convertMapJoin(pctx.getConf(), opParseCtxMap, op,
+ joinTree, mapJoinPos, noCheckOuterJoin, true);
// create a dummy select to select all columns
genSelectPlan(pctx, mapJoinOp);
return mapJoinOp;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Tue Oct 8 03:32:00 2013
@@ -2,21 +2,29 @@ package org.apache.hadoop.hive.ql.optimi
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.RowSchema;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.lib.NodeProcessor;
import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
import org.apache.hadoop.hive.ql.parse.GenTezProcContext;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.HashTableDummyDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
import org.apache.hadoop.hive.ql.plan.TezWork.EdgeType;
@@ -51,6 +59,8 @@ public class ReduceSinkMapJoinProc imple
context.mapJoinParentMap.put(mapJoinOp, parents);
}
+ BaseWork myWork = null;
+
while (childOp != null) {
if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof FileSinkOperator)) {
/*
@@ -63,9 +73,9 @@ public class ReduceSinkMapJoinProc imple
*
*/
- BaseWork myWork = context.operatorWorkMap.get(childOp);
+ myWork = context.operatorWorkMap.get(childOp);
BaseWork parentWork = context.operatorWorkMap.get(parentRS);
-
+
// set the link between mapjoin and parent vertex
int pos = context.mapJoinParentMap.get(mapJoinOp).indexOf(parentRS);
if (pos == -1) {
@@ -97,8 +107,56 @@ public class ReduceSinkMapJoinProc imple
}
}
+ // create the dummy operators
+ List<Operator<? extends OperatorDesc>> dummyOperators =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+
+ // create an new operator: HashTableDummyOperator, which share the table desc
+ HashTableDummyDesc desc = new HashTableDummyDesc();
+ @SuppressWarnings("unchecked")
+ HashTableDummyOperator dummyOp = (HashTableDummyOperator) OperatorFactory.get(desc);
+ TableDesc tbl;
+
+ // need to create the correct table descriptor for key/value
+ RowSchema rowSchema = parentRS.getParentOperators().get(0).getSchema();
+ tbl = PlanUtils.getReduceValueTableDesc(PlanUtils.getFieldSchemasFromRowSchema(rowSchema, ""));
+ dummyOp.getConf().setTbl(tbl);
+
+ Map<Byte, List<ExprNodeDesc>> keyExprMap = mapJoinOp.getConf().getKeys();
+ List<ExprNodeDesc> keyCols = keyExprMap.get(Byte.valueOf((byte) 0));
+ StringBuffer keyOrder = new StringBuffer();
+ for (ExprNodeDesc k: keyCols) {
+ keyOrder.append("+");
+ }
+ TableDesc keyTableDesc = PlanUtils.getReduceKeyTableDesc(PlanUtils
+ .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"), keyOrder.toString());
+ mapJoinOp.getConf().setKeyTableDesc(keyTableDesc);
+
+ // let the dummy op be the parent of mapjoin op
+ mapJoinOp.replaceParent(parentRS, dummyOp);
+ List<Operator<? extends OperatorDesc>> dummyChildren =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ dummyChildren.add(mapJoinOp);
+ dummyOp.setChildOperators(dummyChildren);
+ dummyOperators.add(dummyOp);
+
// cut the operator tree so as to not retain connections from the parent RS downstream
- parentRS.removeChild(mapJoinOp);
+ List<Operator<? extends OperatorDesc>> childOperators = parentRS.getChildOperators();
+ int childIndex = childOperators.indexOf(mapJoinOp);
+ childOperators.remove(childIndex);
+
+ // the "work" needs to know about the dummy operators. They have to be separately initialized
+ // at task startup
+ if (myWork != null) {
+ myWork.addDummyOp(dummyOp);
+ } else {
+ List<Operator<?>> dummyList = dummyOperators;
+ if (context.linkChildOpWithDummyOp.containsKey(childOp)) {
+ dummyList = context.linkChildOpWithDummyOp.get(childOp);
+ }
+ dummyList.add(dummyOp);
+ context.linkChildOpWithDummyOp.put(childOp, dummyList);
+ }
return true;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinTaskDispatcher.java Tue Oct 8 03:32:00 2013
@@ -189,7 +189,8 @@ public class CommonJoinTaskDispatcher ex
// optimize this newWork given the big table position
String bigTableAlias =
- MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
+ MapJoinProcessor.genMapJoinOpAndLocalWork(physicalContext.getParseContext().getConf(),
+ newWork, newJoinOp, bigTablePosition);
return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/SortMergeJoinTaskDispatcher.java Tue Oct 8 03:32:00 2013
@@ -432,7 +432,8 @@ public class SortMergeJoinTaskDispatcher
opParseContextMap.put(newSMBJoinOp, opParseContextMap.get(oldSMBJoinOp));
// generate the map join operator
- return MapJoinProcessor.convertSMBJoinToMapJoin(opParseContextMap, newSMBJoinOp,
+ return MapJoinProcessor.convertSMBJoinToMapJoin(physicalContext.getConf(),
+ opParseContextMap, newSMBJoinOp,
joinTree, mapJoinPos, true);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezProcContext.java Tue Oct 8 03:32:00 2013
@@ -94,6 +94,9 @@ public class GenTezProcContext implement
// what position in the mapjoin the different parent work items will have.
public final Map<MapJoinOperator, List<Operator<?>>> mapJoinParentMap;
+ // remember the dummy ops we created
+ public final Map<Operator<?>, List<Operator<?>>> linkChildOpWithDummyOp;
+
@SuppressWarnings("unchecked")
public GenTezProcContext(HiveConf conf, ParseContext parseContext,
List<Task<MoveWork>> moveTask, List<Task<? extends Serializable>> rootTasks,
@@ -111,5 +114,6 @@ public class GenTezProcContext implement
this.linkOpWithWorkMap = new HashMap<Operator<?>, List<BaseWork>>();
this.operatorWorkMap = new HashMap<Operator<?>, BaseWork>();
this.mapJoinParentMap = new HashMap<MapJoinOperator, List<Operator<?>>>();
+ this.linkChildOpWithDummyOp = new HashMap<Operator<?>, List<Operator<?>>>();
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Tue Oct 8 03:32:00 2013
@@ -24,6 +24,7 @@ import java.util.Stack;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
@@ -118,7 +119,7 @@ public class GenTezWork implements NodeP
GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
// remember which parent belongs to which tag
- reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
+ reduceWork.getTagToInput().put(reduceSink.getConf().getTag(),
context.preceedingWork.getName());
tezWork.add(reduceWork);
@@ -151,12 +152,12 @@ public class GenTezWork implements NodeP
ReduceSinkOperator rs = (ReduceSinkOperator) operator;
ReduceWork rWork = (ReduceWork) followingWork;
GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
-
+
// remember which parent belongs to which tag
rWork.getTagToInput().put(rs.getConf().getTag(), work.getName());
// add dependency between the two work items
- tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator),
+ tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator),
EdgeType.SIMPLE_EDGE);
}
@@ -200,6 +201,11 @@ public class GenTezWork implements NodeP
context.operatorWorkMap.put(operator, work);
List<BaseWork> linkWorkList = context.linkOpWithWorkMap.get(operator);
if (linkWorkList != null) {
+ if (context.linkChildOpWithDummyOp.containsKey(operator)) {
+ for (Operator<?> dummy: context.linkChildOpWithDummyOp.get(operator)) {
+ work.addDummyOp((HashTableDummyOperator) dummy);
+ }
+ }
for (BaseWork parentWork : linkWorkList) {
tezWork.connect(parentWork, work, EdgeType.BROADCAST_EDGE);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/BaseWork.java Tue Oct 8 03:32:00 2013
@@ -19,10 +19,10 @@
package org.apache.hadoop.hive.ql.plan;
import java.util.ArrayList;
-import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.Operator;
/**
@@ -32,6 +32,12 @@ import org.apache.hadoop.hive.ql.exec.Op
@SuppressWarnings({"serial", "deprecation"})
public abstract class BaseWork extends AbstractOperatorDesc {
+ // dummyOps is a reference to all the HashTableDummy operators in the
+ // plan. These have to be separately initialized when we setup a task.
+ // Their funtion is mainly as root ops to give the mapjoin the correct
+ // schema info.
+ List<HashTableDummyOperator> dummyOps;
+
public BaseWork() {}
public BaseWork(String name) {
@@ -58,6 +64,21 @@ public abstract class BaseWork extends A
this.name = name;
}
+ public List<HashTableDummyOperator> getDummyOps() {
+ return dummyOps;
+ }
+
+ public void setDummyOps(List<HashTableDummyOperator> dummyOps) {
+ this.dummyOps = dummyOps;
+ }
+
+ public void addDummyOp(HashTableDummyOperator dummyOp) {
+ if (dummyOps == null) {
+ dummyOps = new LinkedList<HashTableDummyOperator>();
+ }
+ dummyOps.add(dummyOp);
+ }
+
protected abstract List<Operator<?>> getAllRootOperators();
public List<Operator<?>> getAllOperators() {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java?rev=1530140&r1=1530139&r2=1530140&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java Tue Oct 8 03:32:00 2013
@@ -29,7 +29,9 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
@@ -376,14 +378,34 @@ public final class PlanUtils {
/**
* Generate the table descriptor for Map-side join key.
*/
- public static TableDesc getMapJoinKeyTableDesc(List<FieldSchema> fieldSchemas) {
- return new TableDesc(SequenceFileInputFormat.class,
- SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
- MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
- "columns.types", MetaStoreUtils
- .getColumnTypesFromFieldSchema(fieldSchemas),
- serdeConstants.ESCAPE_CHAR, "\\",
- serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+ public static TableDesc getMapJoinKeyTableDesc(Configuration conf,
+ List<FieldSchema> fieldSchemas) {
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_OPTIMIZE_TEZ)) {
+ // In tez we use a different way of transmitting the hash table.
+ // We basically use ReduceSinkOperators and set the transfer to
+ // be broadcast (instead of partitioned). As a consequence we use
+ // a different SerDe than in the MR mapjoin case.
+ StringBuffer order = new StringBuffer();
+ for (FieldSchema f: fieldSchemas) {
+ order.append("+");
+ }
+ return new TableDesc(
+ SequenceFileInputFormat.class, SequenceFileOutputFormat.class,
+ Utilities.makeProperties(serdeConstants.LIST_COLUMNS, MetaStoreUtils
+ .getColumnNamesFromFieldSchema(fieldSchemas),
+ serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils
+ .getColumnTypesFromFieldSchema(fieldSchemas),
+ serdeConstants.SERIALIZATION_SORT_ORDER, order.toString(),
+ serdeConstants.SERIALIZATION_LIB, BinarySortableSerDe.class.getName()));
+ } else {
+ return new TableDesc(SequenceFileInputFormat.class,
+ SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
+ MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
+ "columns.types", MetaStoreUtils
+ .getColumnTypesFromFieldSchema(fieldSchemas),
+ serdeConstants.ESCAPE_CHAR, "\\",
+ serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+ }
}
/**
@@ -391,13 +413,14 @@ public final class PlanUtils {
*/
public static TableDesc getMapJoinValueTableDesc(
List<FieldSchema> fieldSchemas) {
- return new TableDesc(SequenceFileInputFormat.class,
- SequenceFileOutputFormat.class, Utilities.makeProperties("columns",
- MetaStoreUtils.getColumnNamesFromFieldSchema(fieldSchemas),
- "columns.types", MetaStoreUtils
- .getColumnTypesFromFieldSchema(fieldSchemas),
- serdeConstants.ESCAPE_CHAR, "\\",
- serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
+ return new TableDesc(SequenceFileInputFormat.class,
+ SequenceFileOutputFormat.class, Utilities.makeProperties(
+ serdeConstants.LIST_COLUMNS, MetaStoreUtils
+ .getColumnNamesFromFieldSchema(fieldSchemas),
+ serdeConstants.LIST_COLUMN_TYPES, MetaStoreUtils
+ .getColumnTypesFromFieldSchema(fieldSchemas),
+ serdeConstants.ESCAPE_CHAR, "\\",
+ serdeConstants.SERIALIZATION_LIB,LazyBinarySerDe.class.getName()));
}
/**