You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2014/10/14 21:07:05 UTC
svn commit: r1631841 [8/42] - in /hive/branches/llap: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/mr/
accumulo-handler/src/java/org/apache/hadoop/hive...
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 14 19:06:45 2014
@@ -146,6 +146,7 @@ public abstract class Operator<T extends
/**
* Implements the getChildren function for the Node Interface.
*/
+ @Override
public ArrayList<Node> getChildren() {
if (getChildOperators() == null) {
@@ -497,8 +498,6 @@ public abstract class Operator<T extends
LOG.debug("Starting group for children:");
for (Operator<? extends OperatorDesc> op : childOperators) {
- op.setGroupKeyObjectInspector(groupKeyOI);
- op.setGroupKeyObject(groupKeyObject);
op.startGroup();
}
@@ -851,6 +850,7 @@ public abstract class Operator<T extends
*
* @return the name of the operator
*/
+ @Override
public String getName() {
return getOperatorName();
}
@@ -968,7 +968,6 @@ public abstract class Operator<T extends
}
protected transient Object groupKeyObject;
- protected transient ObjectInspector groupKeyOI;
public String getOperatorId() {
return operatorId;
@@ -1061,7 +1060,7 @@ public abstract class Operator<T extends
if (parents != null) {
for (Operator<? extends OperatorDesc> parent : parents) {
- parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+ parentClones.add((parent.clone()));
}
}
@@ -1082,8 +1081,8 @@ public abstract class Operator<T extends
public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
T descClone = (T) conf.clone();
Operator<? extends OperatorDesc> ret =
- (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
- descClone, getSchema());
+ OperatorFactory.getAndMakeChild(
+ descClone, getSchema());
return ret;
}
@@ -1254,15 +1253,15 @@ public abstract class Operator<T extends
}
return null;
}
-
+
public OpTraits getOpTraits() {
if (conf != null) {
return conf.getOpTraits();
}
-
+
return null;
}
-
+
public void setOpTraits(OpTraits metaInfo) {
if (LOG.isDebugEnabled()) {
LOG.debug("Setting traits ("+metaInfo+") on "+this);
@@ -1285,21 +1284,23 @@ public abstract class Operator<T extends
}
}
- public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) {
- this.groupKeyOI = keyObjectInspector;
- }
-
- public ObjectInspector getGroupKeyObjectInspector() {
- return groupKeyOI;
- }
-
public static Operator createDummy() {
return new DummyOperator();
}
private static class DummyOperator extends Operator {
public DummyOperator() { super("dummy"); }
+ @Override
public void processOp(Object row, int tag) { }
+ @Override
public OperatorType getType() { return null; }
}
+
+ public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+ if ((parentOperators == null) || (parentOperators.size() == 0)) {
+ return null;
+ }
+ Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
+ return dummyOps;
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 14 19:06:45 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec;
+import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -114,10 +116,16 @@ public final class OperatorFactory {
RCFileMergeOperator.class));
opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
OrcFileMergeOperator.class));
+ opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
+ CommonMergeJoinOperator.class));
}
static {
vectorOpvec = new ArrayList<OpTuple>();
+ vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+ VectorAppMasterEventOperator.class));
+ vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+ VectorAppMasterEventOperator.class));
vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Tue Oct 14 19:06:45 2014
@@ -46,6 +46,9 @@ public class OperatorUtils {
public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
Set<T> found = new HashSet<T>();
for (Operator<?> start : starts) {
+ if (start == null) {
+ continue;
+ }
findOperators(start, clazz, found);
}
return found;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/OrcFileMergeOperator.java Tue Oct 14 19:06:45 2014
@@ -64,6 +64,7 @@ public class OrcFileMergeOperator extend
private void processKeyValuePairs(Object key, Object value)
throws HiveException {
+ String filePath = "";
try {
OrcFileValueWrapper v;
OrcFileKeyWrapper k;
@@ -72,6 +73,7 @@ public class OrcFileMergeOperator extend
} else {
k = (OrcFileKeyWrapper) key;
}
+ filePath = k.getInputPath().toUri().getPath();
fixTmpPath(k.getInputPath().getParent());
@@ -131,6 +133,16 @@ public class OrcFileMergeOperator extend
this.exception = true;
closeOp(true);
throw new HiveException(e);
+ } finally {
+ if (fdis != null) {
+ try {
+ fdis.close();
+ } catch (IOException e) {
+ throw new HiveException(String.format("Unable to close file %s", filePath), e);
+ } finally {
+ fdis = null;
+ }
+ }
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Tue Oct 14 19:06:45 2014
@@ -337,17 +337,20 @@ public class PTFOperator extends Operato
handleOutputRows(tabFn.finishPartition());
} else {
if ( tabFn.canIterateOutput() ) {
- outputPartRowsItr = tabFn.iterator(inputPart.iterator());
+ outputPartRowsItr = inputPart == null ? null :
+ tabFn.iterator(inputPart.iterator());
} else {
- outputPart = tabFn.execute(inputPart);
- outputPartRowsItr = outputPart.iterator();
+ outputPart = inputPart == null ? null : tabFn.execute(inputPart);
+ outputPartRowsItr = outputPart == null ? null : outputPart.iterator();
}
if ( next != null ) {
if (!next.isStreaming() && !isOutputIterator() ) {
next.inputPart = outputPart;
} else {
- while(outputPartRowsItr.hasNext() ) {
- next.processRow(outputPartRowsItr.next());
+ if ( outputPartRowsItr != null ) {
+ while(outputPartRowsItr.hasNext() ) {
+ next.processRow(outputPartRowsItr.next());
+ }
}
}
}
@@ -357,8 +360,10 @@ public class PTFOperator extends Operato
next.finishPartition();
} else {
if (!isStreaming() ) {
- while(outputPartRowsItr.hasNext() ) {
- forward(outputPartRowsItr.next(), outputObjInspector);
+ if ( outputPartRowsItr != null ) {
+ while(outputPartRowsItr.hasNext() ) {
+ forward(outputPartRowsItr.next(), outputObjInspector);
+ }
}
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 14 19:06:45 2014
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.OutputCollector;
@@ -67,6 +66,9 @@ public class ReduceSinkOperator extends
}
private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
+ private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+ private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+ private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final long serialVersionUID = 1L;
private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
@@ -117,6 +119,8 @@ public class ReduceSinkOperator extends
protected transient Object[] cachedValues;
protected transient List<List<Integer>> distinctColIndices;
protected transient Random random;
+ protected transient int bucketNumber;
+
/**
* This two dimensional array holds key data and a corresponding Union object
* which contains the tag identifying the aggregate expression for distinct columns.
@@ -144,8 +148,14 @@ public class ReduceSinkOperator extends
protected void initializeOp(Configuration hconf) throws HiveException {
try {
List<ExprNodeDesc> keys = conf.getKeyCols();
- LOG.debug("keys size is " + keys.size());
- for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString());
+
+ if (isDebugEnabled) {
+ LOG.debug("keys size is " + keys.size());
+ for (ExprNodeDesc k : keys) {
+ LOG.debug("Key exprNodeDesc " + k.getExprString());
+ }
+ }
+
keyEval = new ExprNodeEvaluator[keys.size()];
int i = 0;
for (ExprNodeDesc e : keys) {
@@ -184,7 +194,9 @@ public class ReduceSinkOperator extends
tag = conf.getTag();
tagByte[0] = (byte) tag;
skipTag = conf.getSkipTag();
- LOG.info("Using tag = " + tag);
+ if (isInfoEnabled) {
+ LOG.info("Using tag = " + tag);
+ }
TableDesc keyTableDesc = conf.getKeySerializeInfo();
keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
@@ -284,7 +296,10 @@ public class ReduceSinkOperator extends
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
- LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys());
+ if (isInfoEnabled) {
+ LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " +
+ conf.getNumDistributionKeys());
+ }
keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
distinctColIndices,
conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -304,15 +319,14 @@ public class ReduceSinkOperator extends
populateCachedDistributionKeys(row, 0);
// replace bucketing columns with hashcode % numBuckets
- int buckNum = -1;
if (bucketEval != null) {
- buckNum = computeBucketNumber(row, conf.getNumBuckets());
- cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+ bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
+ cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber));
} else if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
conf.getWriteType() == AcidUtils.Operation.DELETE) {
// In the non-partitioned case we still want to compute the bucket number for updates and
// deletes.
- buckNum = computeBucketNumber(row, conf.getNumBuckets());
+ bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
}
HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
@@ -328,7 +342,7 @@ public class ReduceSinkOperator extends
if (autoParallel && partitionEval.length > 0) {
hashCode = computeMurmurHash(firstKey);
} else {
- hashCode = computeHashCode(row, buckNum);
+ hashCode = computeHashCode(row);
}
firstKey.setHashCode(hashCode);
@@ -377,7 +391,9 @@ public class ReduceSinkOperator extends
// column directly.
Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
- LOG.debug("Acid choosing bucket number " + buckNum);
+ if (isTraceEnabled) {
+ LOG.trace("Acid choosing bucket number " + buckNum);
+ }
} else {
for (int i = 0; i < bucketEval.length; i++) {
Object o = bucketEval[i].evaluate(row);
@@ -422,7 +438,7 @@ public class ReduceSinkOperator extends
return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
}
- private int computeHashCode(Object row, int buckNum) throws HiveException {
+ private int computeHashCode(Object row) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
@@ -446,8 +462,10 @@ public class ReduceSinkOperator extends
+ ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
- LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum));
- return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
+ if (isTraceEnabled) {
+ LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber));
+ }
+ return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber;
}
private boolean partitionKeysAreNull(Object row) throws HiveException {
@@ -493,10 +511,19 @@ public class ReduceSinkOperator extends
}
private BytesWritable makeValueWritable(Object row) throws Exception {
+ int length = valueEval.length;
+
+ // in case of bucketed table, insert the bucket number as the last column in value
+ if (bucketEval != null) {
+ length -= 1;
+ cachedValues[length] = new Text(String.valueOf(bucketNumber));
+ }
+
// Evaluate the value
- for (int i = 0; i < valueEval.length; i++) {
+ for (int i = 0; i < length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
}
+
// Serialize the value
return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java Tue Oct 14 19:06:45 2014
@@ -697,6 +697,7 @@ public class SMBMapJoinOperator extends
// But if hive supports assigning bucket number for each partition, this can be vary
public void setupContext(List<Path> paths) throws HiveException {
int segmentLen = paths.size();
+ FetchOperator.setFetchOperatorContext(jobConf, fetchWork.getPartDir());
FetchOperator[] segments = segmentsForSize(segmentLen);
for (int i = 0 ; i < segmentLen; i++) {
Path path = paths.get(i);
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 14 19:06:45 2014
@@ -27,6 +27,7 @@ import org.antlr.runtime.CommonToken;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.WordUtils;
+import org.apache.commons.lang3.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -87,6 +88,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -199,6 +201,8 @@ public final class Utilities {
public static String HADOOP_LOCAL_FS = "file:///";
public static String MAP_PLAN_NAME = "map.xml";
public static String REDUCE_PLAN_NAME = "reduce.xml";
+ public static String MERGE_PLAN_NAME = "merge.xml";
+ public static final String INPUT_NAME = "iocontext.input.name";
public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
@@ -289,6 +293,39 @@ public final class Utilities {
return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
}
+ public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
+ boolean useCache) {
+ for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
+ setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+ String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes == null) {
+ prefixes = baseWork.getName();
+ } else {
+ prefixes = prefixes + "," + baseWork.getName();
+ }
+ conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
+ }
+
+ // nothing to return
+ return null;
+ }
+
+ public static BaseWork getMergeWork(JobConf jconf) {
+ if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
+ || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
+ return null;
+ }
+ return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
+ }
+
+ public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+ if (prefix == null || prefix.isEmpty()) {
+ return null;
+ }
+
+ return getBaseWork(jconf, prefix + MERGE_PLAN_NAME);
+ }
+
public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
Path hiveScratchDir) {
try {
@@ -367,6 +404,8 @@ public final class Utilities {
throw new RuntimeException("unable to determine work from configuration ."
+ MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
}
+ } else if (name.contains(MERGE_PLAN_NAME)) {
+ gWork = deserializePlan(in, MapWork.class, conf);
}
gWorkMap.put(path, gWork);
} else {
@@ -390,6 +429,22 @@ public final class Utilities {
}
}
+ public static Map<String, Map<Integer, String>> getScratchColumnVectorTypes(Configuration hiveConf) {
+ BaseWork baseWork = getMapWork(hiveConf);
+ if (baseWork == null) {
+ baseWork = getReduceWork(hiveConf);
+ }
+ return baseWork.getScratchColumnVectorTypes();
+ }
+
+ public static Map<String, Map<String, Integer>> getScratchColumnMap(Configuration hiveConf) {
+ BaseWork baseWork = getMapWork(hiveConf);
+ if (baseWork == null) {
+ baseWork = getReduceWork(hiveConf);
+ }
+ return baseWork.getScratchColumnMap();
+ }
+
public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
try {
Graph stageGraph = plan.getQueryPlan().getStageGraph();
@@ -583,8 +638,14 @@ public final class Utilities {
}
public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
+ String useName = conf.get(INPUT_NAME);
+ if (useName == null) {
+ useName = "mapreduce";
+ }
+ conf.set(INPUT_NAME, useName);
setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
if (w.getReduceWork() != null) {
+ conf.set(INPUT_NAME, useName);
setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
}
}
@@ -1821,7 +1882,7 @@ public final class Utilities {
for (int i = 0; i < parts.length; ++i) {
assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
- + " is not a direcgtory";
+ + " is not a directory";
FileStatus[] items = fs.listStatus(parts[i].getPath());
// remove empty directory since DP insert should not generate empty partitions.
@@ -2259,13 +2320,15 @@ public final class Utilities {
* configuration which receives configured properties
*/
public static void copyTableJobPropertiesToConf(TableDesc tbl, JobConf job) {
- String bucketString = tbl.getProperties()
- .getProperty(hive_metastoreConstants.BUCKET_COUNT);
- // copy the bucket count
- if (bucketString != null) {
- job.set(hive_metastoreConstants.BUCKET_COUNT, bucketString);
+ Properties tblProperties = tbl.getProperties();
+ for(String name: tblProperties.stringPropertyNames()) {
+ if (job.get(name) == null) {
+ String val = (String) tblProperties.get(name);
+ if (val != null) {
+ job.set(name, StringEscapeUtils.escapeJava(val));
+ }
+ }
}
-
Map<String, String> jobProperties = tbl.getJobProperties();
if (jobProperties == null) {
return;
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Oct 14 19:06:45 2014
@@ -56,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Pa
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
@@ -416,6 +418,13 @@ public class ExecDriver extends Task<Map
Utilities.createTmpDirs(job, mWork);
Utilities.createTmpDirs(job, rWork);
+ SessionState ss = SessionState.get();
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && ss != null) {
+ TezSessionState session = ss.getTezSession();
+ TezSessionPoolManager.getInstance().close(session, true);
+ }
+
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
// replace it back
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Oct 14 19:06:45 2014
@@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = null;
@Override
public void configure(JobConf job) {
+ execContext = new ExecMapperContext(job);
// Allocate the bean at the beginning -
memoryMXBean = ManagementFactory.getMemoryMXBean();
l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc
this.rp = rp;
}
+ @Override
public void func(Operator op) {
Map<Enum<?>, Long> opStats = op.getStats();
for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Tue Oct 14 19:06:45 2014
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.mapred.JobConf;
@@ -60,8 +61,9 @@ public class ExecMapperContext {
this.currentBigBucketFile = currentBigBucketFile;
}
- public ExecMapperContext() {
- ioCxt = IOContext.get();
+ public ExecMapperContext(JobConf jc) {
+ this.jc = jc;
+ ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
}
public void clear() {
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Oct 14 19:06:45 2014
@@ -66,6 +66,8 @@ import org.apache.hadoop.util.StringUtil
public class ExecReducer extends MapReduceBase implements Reducer {
private static final Log LOG = LogFactory.getLog("ExecReducer");
+ private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+ private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final String PLAN_KEY = "__REDUCE_PLAN__";
// used to log memory usage periodically
@@ -75,7 +77,6 @@ public class ExecReducer extends MapRedu
private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
private final Object[] valueObject = new Object[Byte.MAX_VALUE];
private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
- private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
// TODO: move to DynamicSerDe when it's ready
private Deserializer inputKeyDeserializer;
@@ -101,16 +102,18 @@ public class ExecReducer extends MapRedu
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
- LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+ if (isInfoEnabled) {
+ LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
- try {
- LOG.info("conf classpath = "
- + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
- LOG.info("thread classpath = "
- + Arrays.asList(((URLClassLoader) Thread.currentThread()
- .getContextClassLoader()).getURLs()));
- } catch (Exception e) {
- LOG.info("cannot get classpath: " + e.getMessage());
+ try {
+ LOG.info("conf classpath = "
+ + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+ LOG.info("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
+ } catch (Exception e) {
+ LOG.info("cannot get classpath: " + e.getMessage());
+ }
}
jc = job;
@@ -147,7 +150,6 @@ public class ExecReducer extends MapRedu
ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
ois.add(keyObjectInspector);
ois.add(valueObjectInspector[tag]);
- reducer.setGroupKeyObjectInspector(keyObjectInspector);
rowObjectInspector[tag] = ObjectInspectorFactory
.getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
}
@@ -202,7 +204,9 @@ public class ExecReducer extends MapRedu
groupKey = new BytesWritable();
} else {
// If a operator wants to do some work at the end of a group
- LOG.trace("End Group");
+ if (isTraceEnabled) {
+ LOG.trace("End Group");
+ }
reducer.endGroup();
}
@@ -217,9 +221,11 @@ public class ExecReducer extends MapRedu
}
groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
- LOG.trace("Start Group");
- reducer.setGroupKeyObject(keyObject);
+ if (isTraceEnabled) {
+ LOG.trace("Start Group");
+ }
reducer.startGroup();
+ reducer.setGroupKeyObject(keyObject);
}
// System.err.print(keyObject.toString());
while (values.hasNext()) {
@@ -239,12 +245,14 @@ public class ExecReducer extends MapRedu
row.clear();
row.add(keyObject);
row.add(valueObject[tag]);
- if (isLogInfoEnabled) {
+ if (isInfoEnabled) {
cntr++;
if (cntr == nextCntr) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- LOG.info("ExecReducer: processing " + cntr
- + " rows: used memory = " + used_memory);
+ if (isInfoEnabled) {
+ LOG.info("ExecReducer: processing " + cntr
+ + " rows: used memory = " + used_memory);
+ }
nextCntr = getNextCntr(cntr);
}
}
@@ -290,17 +298,19 @@ public class ExecReducer extends MapRedu
public void close() {
// No row was processed
- if (oc == null) {
+ if (oc == null && isTraceEnabled) {
LOG.trace("Close called without any rows processed");
}
try {
if (groupKey != null) {
// If a operator wants to do some work at the end of a group
- LOG.trace("End Group");
+ if (isTraceEnabled) {
+ LOG.trace("End Group");
+ }
reducer.endGroup();
}
- if (isLogInfoEnabled) {
+ if (isInfoEnabled) {
LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
+ memoryMXBean.getHeapMemoryUsage().getUsed());
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Tue Oct 14 19:06:45 2014
@@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas
// not sure we need this exec context; but all the operators in the work
// will pass this context throught
- private ExecMapperContext execContext = new ExecMapperContext();
+ private ExecMapperContext execContext = null;
private Process executor;
@@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas
public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
super.initialize(conf, queryPlan, driverContext);
job = new JobConf(conf, ExecDriver.class);
+ execContext = new ExecMapperContext(job);
//we don't use the HadoopJobExecHooks for local tasks
this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
}
@@ -237,6 +238,18 @@ public class MapredLocalTask extends Tas
variables.put(HADOOP_OPTS_KEY, hadoopOpts);
}
+ //For Windows OS, we need to pass HIVE_HADOOP_CLASSPATH Java parameter while starting
+ //Hiveserver2 using "-hiveconf hive.hadoop.classpath=%HIVE_LIB%". This is to combine path(s).
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)!= null)
+ {
+ if (variables.containsKey("HADOOP_CLASSPATH"))
+ {
+ variables.put("HADOOP_CLASSPATH", variables.get("HADOOP_CLASSPATH") + ";" + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+ } else {
+ variables.put("HADOOP_CLASSPATH", HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+ }
+ }
+
if(variables.containsKey(MapRedTask.HIVE_DEBUG_RECURSIVE)) {
MapRedTask.configureDebugVariablesForChildJVM(variables);
}
@@ -301,6 +314,11 @@ public class MapredLocalTask extends Tas
if (work == null) {
return -1;
}
+
+ if (execContext == null) {
+ execContext = new ExecMapperContext(job);
+ }
+
memoryMXBean = ManagementFactory.getMemoryMXBean();
long startTime = System.currentTimeMillis();
console.printInfo(Utilities.now()
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Tue Oct 14 19:06:45 2014
@@ -31,6 +31,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -79,9 +80,14 @@ public class CustomPartitionVertex exten
private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
- private boolean rootVertexInitialized = false;
private final SplitGrouper grouper = new SplitGrouper();
private int taskCount = 0;
+ private VertexType vertexType;
+ private String mainWorkName;
+ private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
+
+ private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
+ new HashMap<String, Multimap<Integer, InputSplit>>();
public CustomPartitionVertex(VertexManagerPluginContext context) {
super(context);
@@ -90,8 +96,18 @@ public class CustomPartitionVertex exten
@Override
public void initialize() {
this.context = getContext();
- ByteBuffer byteBuf = context.getUserPayload().getPayload();
- this.numBuckets = byteBuf.getInt();
+ ByteBuffer payload = context.getUserPayload().getPayload();
+ CustomVertexConfiguration vertexConf = new CustomVertexConfiguration();
+ DataInputByteBuffer dibb = new DataInputByteBuffer();
+ dibb.reset(payload);
+ try {
+ vertexConf.readFields(dibb);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.numBuckets = vertexConf.getNumBuckets();
+ this.mainWorkName = vertexConf.getInputName();
+ this.vertexType = vertexConf.getVertexType();
}
@Override
@@ -113,17 +129,12 @@ public class CustomPartitionVertex exten
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
}
- // One call per root Input - and for now only one is handled.
+ // One call per root Input
@Override
public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
List<Event> events) {
+ LOG.info("On root vertex initialized " + inputName);
- // Ideally, since there's only 1 Input expected at the moment -
- // ensure this method is called only once. Tez will call it once per Root
- // Input.
- Preconditions.checkState(rootVertexInitialized == false);
- LOG.info("Root vertex not initialized");
- rootVertexInitialized = true;
try {
// This is using the payload from the RootVertexInitializer corresponding
// to InputName. Ideally it should be using it's own configuration class -
@@ -164,9 +175,6 @@ public class CustomPartitionVertex exten
// No tasks should have been started yet. Checked by initial state
// check.
Preconditions.checkState(dataInformationEventSeen == false);
- Preconditions
- .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
- "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
@@ -220,21 +228,55 @@ public class CustomPartitionVertex exten
(bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
Multimap<Integer, InputSplit> groupedSplit =
HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
- availableSlots);
+ availableSlots, inputName);
bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
}
- LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
- processAllEvents(inputName, bucketToGroupedSplitMap);
+ LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
+ if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+ /*
+ * this is the small table side. In case of SMB join, we may need to send each split to the
+ * corresponding bucket-based task on the other side. In case a split needs to go to
+ * multiple downstream tasks, we need to clone the event and send it to the right
+ * destination.
+ */
+ processAllSideEvents(inputName, bucketToGroupedSplitMap);
+ } else {
+ processAllEvents(inputName, bucketToGroupedSplitMap);
+ }
} catch (Exception e) {
throw new RuntimeException(e);
}
}
+ private void processAllSideEvents(String inputName,
+ Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+ // the bucket to task map should have been setup by the big table.
+ if (bucketToTaskMap.isEmpty()) {
+ inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
+ return;
+ }
+ List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+ for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+ Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+ for (Integer task : destTasks) {
+ for (InputSplit split : entry.getValue()) {
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
+ .toByteString().asReadOnlyByteBuffer());
+ diEvent.setTargetIndex(task);
+ taskEvents.add(diEvent);
+ }
+ }
+ }
+
+ context.addRootInputEvents(inputName, taskEvents);
+ }
+
private void processAllEvents(String inputName,
Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
- Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
List<InputSplit> finalSplits = Lists.newLinkedList();
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
@@ -248,11 +290,13 @@ public class CustomPartitionVertex exten
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
- EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
- EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
- UserPayload payload = getBytePayload(bucketToTaskMap);
- hiveEdgeManagerDesc.setUserPayload(payload);
-
+ EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
+ if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES)
+ || (vertexType == VertexType.INITIALIZED_EDGES)) {
+ hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+ UserPayload payload = getBytePayload(bucketToTaskMap);
+ hiveEdgeManagerDesc.setUserPayload(payload);
+ }
Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type custom.
@@ -285,13 +329,21 @@ public class CustomPartitionVertex exten
rootInputSpecUpdate.put(
inputName,
InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
- context.setVertexParallelism(
- taskCount,
- VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+ if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
+ context.setVertexParallelism(
+ taskCount,
+ VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+ .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+ }
// Set the actual events for the tasks.
context.addRootInputEvents(inputName, taskEvents);
+ if (inputToGroupedSplitMap.isEmpty() == false) {
+ for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
+ processAllSideEvents(entry.getKey(), entry.getValue());
+ }
+ inputToGroupedSplitMap.clear();
+ }
}
UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -315,7 +367,8 @@ public class CustomPartitionVertex exten
if (!(inputSplit instanceof FileSplit)) {
throw new UnsupportedOperationException(
- "Cannot handle splits other than FileSplit for the moment");
+ "Cannot handle splits other than FileSplit for the moment. Current input split type: "
+ + inputSplit.getClass().getSimpleName());
}
return (FileSplit) inputSplit;
}
@@ -327,7 +380,6 @@ public class CustomPartitionVertex exten
Map<String, List<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
- int fsCount = 0;
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -335,14 +387,20 @@ public class CustomPartitionVertex exten
for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
- fsCount++;
bucketToInitialSplitMap.put(bucketId, fsplit);
}
bucketNum++;
}
- LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: "
- + pathFileSplitsMap.size());
+ if (bucketNum < numBuckets) {
+ int loopedBucketId = 0;
+ for (; bucketNum < numBuckets; bucketNum++) {
+ for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) {
+ bucketToInitialSplitMap.put(bucketNum, fsplit);
+ }
+ loopedBucketId++;
+ }
+ }
return bucketToInitialSplitMap;
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Tue Oct 14 19:06:45 2014
@@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
@@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.BaseWork;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.stats.StatsFactory;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co
import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
/**
* DagUtils. DagUtils is a collection of helper methods to convert
* map and reduce work to tez vertices and edges. It handles configuration
@@ -130,6 +139,11 @@ public class DagUtils {
private static final Log LOG = LogFactory.getLog(DagUtils.class.getName());
private static final String TEZ_DIR = "_tez_scratch_dir";
private static DagUtils instance;
+ // The merge file being currently processed.
+ public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
+ "hive.tez.current.merge.file.prefix";
+ // "A comma separated list of work names used as prefix.
+ public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
private void addCredentials(MapWork mapWork, DAG dag) {
Set<String> paths = mapWork.getPathToAliases().keySet();
@@ -238,8 +252,8 @@ public class DagUtils {
* endpoints.
*/
@SuppressWarnings("rawtypes")
- public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
- Vertex w, TezEdgeProperty edgeProp)
+ public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
+ TezEdgeProperty edgeProp, VertexType vertexType)
throws IOException {
Class mergeInputClass;
@@ -254,10 +268,14 @@ public class DagUtils {
case CUSTOM_EDGE: {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(numBuckets, vertexType, "");
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
- ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
- userPayload.flip();
+ byte[] userPayloadBytes = dob.getData();
+ ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
@@ -289,17 +307,21 @@ public class DagUtils {
* @param w The second vertex (sink)
* @return
*/
- public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
- TezEdgeProperty edgeProp)
+ public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
+ VertexType vertexType)
throws IOException {
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
- userPayload.flip();
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(numBuckets, vertexType, "");
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
CustomPartitionVertex.class.getName());
+ byte[] userPayloadBytes = dob.getData();
+ ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
desc.setUserPayload(UserPayload.create(userPayload));
w.setVertexManagerPlugin(desc);
break;
@@ -405,7 +427,7 @@ public class DagUtils {
* from yarn. Falls back to Map-reduce's map size if tez
* container size isn't set.
*/
- private Resource getContainerResource(Configuration conf) {
+ public static Resource getContainerResource(Configuration conf) {
int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
@@ -443,12 +465,61 @@ public class DagUtils {
return MRHelpers.getJavaOptsForMRMapper(conf);
}
+ private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
+ List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
+ VertexType vertexType)
+ throws Exception {
+ Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
+ if (mergeJoinWork.getMainWork() instanceof MapWork) {
+ List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
+ MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
+ CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+ Vertex mergeVx =
+ createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+
+ // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
+ // here would cause pre-mature grouping which would be incorrect.
+ Class inputFormatClass = HiveInputFormat.class;
+ conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+ // mapreduce.tez.input.initializer.serialize.event.payload should be set
+ // to false when using this plug-in to avoid getting a serialized event at run-time.
+ conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
+ for (int i = 0; i < mapWorkList.size(); i++) {
+
+ mapWork = (MapWork) (mapWorkList.get(i));
+ conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
+ conf.set(Utilities.INPUT_NAME, mapWork.getName());
+ LOG.info("Going through each work and adding MultiMRInput");
+ mergeVx.addDataSource(mapWork.getName(),
+ MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build());
+ }
+
+ VertexManagerPluginDescriptor desc =
+ VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ CustomVertexConfiguration vertexConf =
+ new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
+ .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+ DataOutputBuffer dob = new DataOutputBuffer();
+ vertexConf.write(dob);
+ byte[] userPayload = dob.getData();
+ desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+ mergeVx.setVertexManagerPlugin(desc);
+ return mergeVx;
+ } else {
+ Vertex mergeVx =
+ createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
+ mrScratchDir, ctx);
+ return mergeVx;
+ }
+ }
+
/*
* Helper function to create Vertex from MapWork.
*/
private Vertex createVertex(JobConf conf, MapWork mapWork,
LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
- Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
+ Path mrScratchDir, Context ctx, VertexType vertexType)
+ throws Exception {
Path tezDir = getTezDir(mrScratchDir);
@@ -470,15 +541,8 @@ public class DagUtils {
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
- boolean vertexHasCustomInput = false;
- if (tezWork != null) {
- for (BaseWork baseWork : tezWork.getParents(mapWork)) {
- if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
- vertexHasCustomInput = true;
- }
- }
- }
-
+ boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType);
+ LOG.info("Vertex has custom input? " + vertexHasCustomInput);
if (vertexHasCustomInput) {
groupSplitsInInputInitializer = false;
// grouping happens in execution phase. The input payload should not enable grouping here,
@@ -513,6 +577,8 @@ public class DagUtils {
}
}
+ // remember mapping of plan to input
+ conf.set(Utilities.INPUT_NAME, mapWork.getName());
if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
&& !mapWork.isUseOneNullRowInputFormat()) {
@@ -593,6 +659,7 @@ public class DagUtils {
Path mrScratchDir, Context ctx) throws Exception {
// set up operator plan
+ conf.set(Utilities.INPUT_NAME, reduceWork.getName());
Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false);
// create the directories FileSinkOperators need
@@ -850,7 +917,7 @@ public class DagUtils {
throws IOException {
FileSystem destFS = dest.getFileSystem(conf);
- if (src != null) {
+ if (src != null && checkPreExisting(src, dest, conf) == false) {
// copy the src to the destination and create local resource.
// do not overwrite.
LOG.info("Localizing resource because it does not exist: " + src + " to dest: " + dest);
@@ -904,7 +971,7 @@ public class DagUtils {
public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
- JobConf conf = new JobConf(hiveConf);
+ JobConf conf = new JobConf(new TezConfiguration(hiveConf));
conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
@@ -937,12 +1004,22 @@ public class DagUtils {
return initializeVertexConf(conf, context, (MapWork)work);
} else if (work instanceof ReduceWork) {
return initializeVertexConf(conf, context, (ReduceWork)work);
+ } else if (work instanceof MergeJoinWork) {
+ return initializeVertexConf(conf, context, (MergeJoinWork) work);
} else {
assert false;
return null;
}
}
+ private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
+ if (work.getMainWork() instanceof MapWork) {
+ return initializeVertexConf(conf, context, (MapWork) (work.getMainWork()));
+ } else {
+ return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork()));
+ }
+ }
+
/**
* Create a vertex from a given work object.
*
@@ -958,18 +1035,21 @@ public class DagUtils {
*/
public Vertex createVertex(JobConf conf, BaseWork work,
Path scratchDir, LocalResource appJarLr,
- List<LocalResource> additionalLr,
- FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
+ List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
+ TezWork tezWork, VertexType vertexType) throws Exception {
Vertex v = null;
// simply dispatch the call to the right method for the actual (sub-) type of
// BaseWork.
if (work instanceof MapWork) {
- v = createVertex(conf, (MapWork) work, appJarLr,
- additionalLr, fileSystem, scratchDir, ctx, tezWork);
+ v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
+ vertexType);
} else if (work instanceof ReduceWork) {
v = createVertex(conf, (ReduceWork) work, appJarLr,
additionalLr, fileSystem, scratchDir, ctx);
+ } else if (work instanceof MergeJoinWork) {
+ v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
+ ctx, vertexType);
} else {
// something is seriously wrong if this is happening
throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Tue Oct 14 19:06:45 2014
@@ -31,6 +31,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
@@ -77,12 +79,13 @@ public class DynamicPartitionPruner {
private final BytesWritable writable = new BytesWritable();
- private final BlockingQueue<InputInitializerEvent> queue =
- new LinkedBlockingQueue<InputInitializerEvent>();
+ private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+ private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
private int sourceInfoCount = 0;
- private InputInitializerContext context;
+ private final Object endOfEvents = new Object();
public DynamicPartitionPruner() {
}
@@ -91,8 +94,21 @@ public class DynamicPartitionPruner {
throws SerDeException, IOException,
InterruptedException, HiveException {
- this.context = context;
- this.initialize(work, jobConf);
+ synchronized(sourcesWaitingForEvents) {
+ initialize(work, jobConf);
+
+ if (sourcesWaitingForEvents.isEmpty()) {
+ return;
+ }
+
+ Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+ for (String source : sourcesWaitingForEvents) {
+ // we need to get state transition updates for the vertices that will send
+ // events to us. once we have received all events and a vertex has succeeded,
+ // we can move to do the pruning.
+ context.registerForVertexStateUpdates(source, states);
+ }
+ }
LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
// synchronous event processing loop. Won't return until all events have
@@ -102,7 +118,7 @@ public class DynamicPartitionPruner {
LOG.info("Ok to proceed.");
}
- public BlockingQueue<InputInitializerEvent> getQueue() {
+ public BlockingQueue<Object> getQueue() {
return queue;
}
@@ -111,11 +127,14 @@ public class DynamicPartitionPruner {
sourceInfoCount = 0;
}
- private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+ public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
this.clear();
Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+ Set<String> sources = work.getEventSourceTableDescMap().keySet();
+
+ sourcesWaitingForEvents.addAll(sources);
- for (String s : work.getEventSourceTableDescMap().keySet()) {
+ for (String s : sources) {
List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -277,46 +296,30 @@ public class DynamicPartitionPruner {
private void processEvents() throws SerDeException, IOException, InterruptedException {
int eventCount = 0;
- int neededEvents = getExpectedNumberOfEvents();
- while (neededEvents > eventCount) {
- InputInitializerEvent event = queue.take();
+ while (true) {
+ Object element = queue.take();
+
+ if (element == endOfEvents) {
+ // we're done processing events
+ break;
+ }
+
+ InputInitializerEvent event = (InputInitializerEvent) element;
+
LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
+ ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
- processPayload(event.getUserPayload());
+ processPayload(event.getUserPayload(), event.getSourceVertexName());
eventCount += 1;
- neededEvents = getExpectedNumberOfEvents();
- LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
}
- }
-
- private int getExpectedNumberOfEvents() throws InterruptedException {
- int neededEvents = 0;
-
- boolean notInitialized;
- do {
- neededEvents = 0;
- notInitialized = false;
- for (String s : sourceInfoMap.keySet()) {
- int multiplier = sourceInfoMap.get(s).size();
- int taskNum = context.getVertexNumTasks(s);
- LOG.info("Vertex " + s + " has " + taskNum + " events.");
- if (taskNum < 0) {
- notInitialized = true;
- Thread.sleep(10);
- continue;
- }
- neededEvents += (taskNum * multiplier);
- }
- } while (notInitialized);
-
- return neededEvents;
+ LOG.info("Received events: " + eventCount);
}
@SuppressWarnings("deprecation")
- private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
+ private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
+ IOException {
+
DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
- String sourceName = in.readUTF();
String columnName = in.readUTF();
boolean skip = in.readBoolean();
@@ -390,4 +393,26 @@ public class DynamicPartitionPruner {
}
}
+ public void addEvent(InputInitializerEvent event) {
+ synchronized(sourcesWaitingForEvents) {
+ if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
+ queue.offer(event);
+ }
+ }
+ }
+
+ public void processVertex(String name) {
+ LOG.info("Vertex succeeded: " + name);
+
+ synchronized(sourcesWaitingForEvents) {
+ sourcesWaitingForEvents.remove(name);
+
+ if (sourcesWaitingForEvents.isEmpty()) {
+ // we've got what we need; mark the queue
+ queue.offer(endOfEvents);
+ } else {
+ LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
+ }
+ }
+ }
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Tue Oct 14 19:06:45 2014
@@ -38,8 +38,9 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TaskLocationHint;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.mapreduce.hadoop.InputSplitInfoMem;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.protos.MRRuntimeProtos.MRInputUserPayloadProto;
@@ -152,8 +153,21 @@ public class HiveSplitGenerator extends
public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
Configuration conf, InputSplit[] splits, float waves, int availableSlots)
throws Exception {
+ return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+ }
- MapWork work = Utilities.getMapWork(jobConf);
+ public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots,
+ String inputName) throws Exception {
+
+ MapWork work = null;
+ if (inputName != null) {
+ work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+ // work can still be null if there is no merge work for this input
+ }
+ if (work == null) {
+ work = Utilities.getMapWork(jobConf);
+ }
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
@@ -230,9 +244,14 @@ public class HiveSplitGenerator extends
}
@Override
+ public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
+ pruner.processVertex(stateUpdate.getVertexName());
+ }
+
+ @Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
for (InputInitializerEvent e : events) {
- pruner.getQueue().put(e);
+ pruner.addEvent(e);
}
}
}
Modified: hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1631841&r1=1631840&r2=1631841&view=diff
==============================================================================
--- hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Oct 14 19:06:45 2014
@@ -17,14 +17,20 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
import org.apache.hadoop.hive.ql.exec.HashTableDummyOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -36,15 +42,17 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
+import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.util.StringUtils;
import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
+import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
@@ -58,27 +66,61 @@ public class MapRecordProcessor extends
private MapOperator mapOp;
+ private final List<MapOperator> mergeMapOpList = new ArrayList<MapOperator>();
public static final Log l4j = LogFactory.getLog(MapRecordProcessor.class);
- private final ExecMapperContext execContext = new ExecMapperContext();
+ private MapRecordSource[] sources;
+ private final Map<String, MultiMRInput> multiMRInputMap = new HashMap<String, MultiMRInput>();
+ private int position = 0;
+ private boolean foundCachedMergeWork = false;
+ MRInputLegacy legacyMRInput = null;
+ private ExecMapperContext execContext = null;
private boolean abort = false;
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
private MapWork mapWork;
+ List<MapWork> mergeWorkList = null;
+ private static Map<Integer, DummyStoreOperator> connectOps =
+ new TreeMap<Integer, DummyStoreOperator>();
- public MapRecordProcessor(JobConf jconf) {
+ public MapRecordProcessor(JobConf jconf) throws Exception {
ObjectCache cache = ObjectCacheFactory.getCache(jconf);
+ execContext = new ExecMapperContext(jconf);
execContext.setJc(jconf);
// create map and fetch operators
mapWork = (MapWork) cache.retrieve(MAP_PLAN_KEY);
if (mapWork == null) {
mapWork = Utilities.getMapWork(jconf);
cache.cache(MAP_PLAN_KEY, mapWork);
- l4j.info("Plan: "+mapWork);
+ l4j.debug("Plan: " + mapWork);
for (String s: mapWork.getAliases()) {
- l4j.info("Alias: "+s);
+ l4j.debug("Alias: " + s);
}
} else {
Utilities.setMapWork(jconf, mapWork);
}
+
+ String prefixes = jconf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+ if (prefixes != null) {
+ mergeWorkList = new ArrayList<MapWork>();
+ for (String prefix : prefixes.split(",")) {
+ MapWork mergeMapWork = (MapWork) cache.retrieve(prefix);
+ if (mergeMapWork != null) {
+ l4j.info("Found merge work in cache");
+ foundCachedMergeWork = true;
+ mergeWorkList.add(mergeMapWork);
+ continue;
+ }
+ if (foundCachedMergeWork) {
+ throw new Exception(
+ "Should find all work in cache else operator pipeline will be in non-deterministic state");
+ }
+
+ if ((prefix != null) && (prefix.isEmpty() == false)) {
+ mergeMapWork = (MapWork) Utilities.getMergeWork(jconf, prefix);
+ mergeWorkList.add(mergeMapWork);
+ cache.cache(prefix, mergeMapWork);
+ }
+ }
+ }
}
@Override
@@ -88,8 +130,8 @@ public class MapRecordProcessor extends
super.init(jconf, processorContext, mrReporter, inputs, outputs);
//Update JobConf using MRInput, info like filename comes via this
- MRInputLegacy mrInput = TezProcessor.getMRInput(inputs);
- Configuration updatedConf = mrInput.getConfigUpdates();
+ legacyMRInput = getMRInput(inputs);
+ Configuration updatedConf = legacyMRInput.getConfigUpdates();
if (updatedConf != null) {
for (Entry<String, String> entry : updatedConf) {
jconf.set(entry.getKey(), entry.getValue());
@@ -99,20 +141,52 @@ public class MapRecordProcessor extends
createOutputMap();
// Start all the Outputs.
for (Entry<String, LogicalOutput> outputEntry : outputs.entrySet()) {
- l4j.info("Starting Output: " + outputEntry.getKey());
+ l4j.debug("Starting Output: " + outputEntry.getKey());
outputEntry.getValue().start();
((TezKVOutputCollector) outMap.get(outputEntry.getKey())).initialize();
}
try {
+
if (mapWork.getVectorMode()) {
mapOp = new VectorMapOperator();
} else {
mapOp = new MapOperator();
}
+ connectOps.clear();
+ if (mergeWorkList != null) {
+ MapOperator mergeMapOp = null;
+ for (MapWork mergeMapWork : mergeWorkList) {
+ processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
+ .get(mergeMapWork.getName()))));
+ if (mergeMapWork.getVectorMode()) {
+ mergeMapOp = new VectorMapOperator();
+ } else {
+ mergeMapOp = new MapOperator();
+ }
+
+ mergeMapOpList.add(mergeMapOp);
+ // initialize the merge operators first.
+ if (mergeMapOp != null) {
+ mergeMapOp.setConf(mergeMapWork);
+ l4j.info("Input name is " + mergeMapWork.getName());
+ jconf.set(Utilities.INPUT_NAME, mergeMapWork.getName());
+ mergeMapOp.setChildren(jconf);
+ if (foundCachedMergeWork == false) {
+ DummyStoreOperator dummyOp = getJoinParentOp(mergeMapOp);
+ connectOps.put(mergeMapWork.getTag(), dummyOp);
+ }
+ mergeMapOp.setExecContext(new ExecMapperContext(jconf));
+ mergeMapOp.initializeLocalWork(jconf);
+ }
+ }
+ }
+
// initialize map operator
mapOp.setConf(mapWork);
+ l4j.info("Main input name is " + mapWork.getName());
+ jconf.set(Utilities.INPUT_NAME, mapWork.getName());
mapOp.setChildren(jconf);
l4j.info(mapOp.dump(0));
@@ -121,12 +195,21 @@ public class MapRecordProcessor extends
((TezContext) MapredContext.get()).setTezProcessorContext(processorContext);
mapOp.setExecContext(execContext);
mapOp.initializeLocalWork(jconf);
+
+ initializeMapRecordSources();
mapOp.initialize(jconf, null);
+ if ((mergeMapOpList != null) && mergeMapOpList.isEmpty() == false) {
+ for (MapOperator mergeMapOp : mergeMapOpList) {
+ jconf.set(Utilities.INPUT_NAME, mergeMapOp.getConf().getName());
+ mergeMapOp.initialize(jconf, null);
+ }
+ }
// Initialization isn't finished until all parents of all operators
// are initialized. For broadcast joins that means initializing the
// dummy parent operators as well.
List<HashTableDummyOperator> dummyOps = mapWork.getDummyOps();
+ jconf.set(Utilities.INPUT_NAME, mapWork.getName());
if (dummyOps != null) {
for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
dummyOp.setExecContext(execContext);
@@ -151,54 +234,46 @@ public class MapRecordProcessor extends
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
}
- @Override
- void run() throws IOException{
-
- MRInputLegacy in = TezProcessor.getMRInput(inputs);
- KeyValueReader reader = in.getReader();
+ private void initializeMapRecordSources() throws Exception {
+ int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
+ sources = new MapRecordSource[size];
+ KeyValueReader reader = legacyMRInput.getReader();
+ position = mapOp.getConf().getTag();
+ sources[position] = new MapRecordSource();
+ sources[position].init(jconf, mapOp, reader);
+ for (MapOperator mapOp : mergeMapOpList) {
+ int tag = mapOp.getConf().getTag();
+ sources[tag] = new MapRecordSource();
+ String inputName = mapOp.getConf().getName();
+ MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
+ Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
+ l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
+ List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
+ reader = new KeyValueInputMerger(kvReaderList);
+ sources[tag].init(jconf, mapOp, reader);
+ }
+ ((TezContext) MapredContext.get()).setRecordSources(sources);
+ }
- //process records until done
- while(reader.next()){
- //ignore the key for maps - reader.getCurrentKey();
- Object value = reader.getCurrentValue();
- boolean needMore = processRow(value);
- if(!needMore){
- break;
+ private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
+ for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
+ if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
+ return (DummyStoreOperator) childOp;
+ } else {
+ return getJoinParentOp(childOp);
}
}
+ return null;
}
+ @Override
+ void run() throws Exception {
- /**
- * @param value value to process
- * @return true if it is not done and can take more inputs
- */
- private boolean processRow(Object value) {
- // reset the execContext for each new row
- execContext.resetRow();
-
- try {
- if (mapOp.getDone()) {
- return false; //done
- } else {
- // Since there is no concept of a group, we don't invoke
- // startGroup/endGroup for a mapper
- mapOp.process((Writable)value);
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
- } catch (Throwable e) {
- abort = true;
- if (e instanceof OutOfMemoryError) {
- // Don't create a new object if we are already out of memory
- throw (OutOfMemoryError) e;
- } else {
- l4j.fatal(StringUtils.stringifyException(e));
- throw new RuntimeException(e);
+ while (sources[position].pushRecord()) {
+ if (isLogInfoEnabled) {
+ logProgress();
}
}
- return true; //give me more
}
@Override
@@ -214,6 +289,11 @@ public class MapRecordProcessor extends
return;
}
mapOp.close(abort);
+ if (mergeMapOpList.isEmpty() == false) {
+ for (MapOperator mergeMapOp : mergeMapOpList) {
+ mergeMapOp.close(abort);
+ }
+ }
// Need to close the dummyOps as well. The operator pipeline
// is not considered "closed/done" unless all operators are
@@ -242,4 +322,27 @@ public class MapRecordProcessor extends
MapredContext.close();
}
}
+
+ public static Map<Integer, DummyStoreOperator> getConnectOps() {
+ return connectOps;
+ }
+
+ private MRInputLegacy getMRInput(Map<String, LogicalInput> inputs) throws Exception {
+ // there should be only one MRInput
+ MRInputLegacy theMRInput = null;
+ l4j.info("The input names are: " + Arrays.toString(inputs.keySet().toArray()));
+ for (Entry<String, LogicalInput> inp : inputs.entrySet()) {
+ if (inp.getValue() instanceof MRInputLegacy) {
+ if (theMRInput != null) {
+ throw new IllegalArgumentException("Only one MRInput is expected");
+ }
+ // a better logic would be to find the alias
+ theMRInput = (MRInputLegacy) inp.getValue();
+ } else if (inp.getValue() instanceof MultiMRInput) {
+ multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
+ }
+ }
+ theMRInput.init();
+ return theMRInput;
+ }
}