You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC
svn commit: r1635536 [8/28] - in /hive/branches/spark: ./ accumulo-handler/
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/
accumulo-handler/src/test/org/apache/hadoo...
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Oct 30 16:22:33 2014
@@ -25,8 +25,6 @@ import java.util.Arrays;
import java.util.List;
import java.util.Random;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -51,10 +49,13 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.hash.MurmurHash;
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
@@ -65,10 +66,13 @@ public class ReduceSinkOperator extends
PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
}
- private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
- private static final boolean isInfoEnabled = LOG.isInfoEnabled();
- private static final boolean isDebugEnabled = LOG.isDebugEnabled();
- private static final boolean isTraceEnabled = LOG.isTraceEnabled();
+ /**
+ * Counters.
+ */
+ public static enum Counter {
+ RECORDS_OUT_INTERMEDIATE
+ }
+
private static final long serialVersionUID = 1L;
private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
@@ -110,7 +114,7 @@ public class ReduceSinkOperator extends
protected transient int numDistributionKeys;
protected transient int numDistinctExprs;
protected transient String[] inputAliases; // input aliases of this RS for join (used for PPD)
- protected transient boolean autoParallel = false;
+ protected transient boolean useUniformHash = false;
// picks topN K:V pairs from input.
protected transient TopNHash reducerHash = new TopNHash();
protected transient HiveKey keyWritable = new HiveKey();
@@ -144,12 +148,25 @@ public class ReduceSinkOperator extends
private StructObjectInspector recIdInspector; // OI for the record identifier
private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+ private final transient LongWritable recordCounter = new LongWritable();
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
+
+ numRows = 0;
+
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
List<ExprNodeDesc> keys = conf.getKeyCols();
- if (isDebugEnabled) {
+ if (isLogDebugEnabled) {
LOG.debug("keys size is " + keys.size());
for (ExprNodeDesc k : keys) {
LOG.debug("Key exprNodeDesc " + k.getExprString());
@@ -194,7 +211,7 @@ public class ReduceSinkOperator extends
tag = conf.getTag();
tagByte[0] = (byte) tag;
skipTag = conf.getSkipTag();
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
LOG.info("Using tag = " + tag);
}
@@ -217,7 +234,7 @@ public class ReduceSinkOperator extends
reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
}
- autoParallel = conf.isAutoParallel();
+ useUniformHash = conf.getReducerTraits().contains(UNIFORM);
firstRow = true;
initializeChildren(hconf);
@@ -296,7 +313,7 @@ public class ReduceSinkOperator extends
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
- if (isInfoEnabled) {
+ if (isLogInfoEnabled) {
LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " +
conf.getNumDistributionKeys());
}
@@ -339,10 +356,10 @@ public class ReduceSinkOperator extends
final int hashCode;
// distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
- if (autoParallel && partitionEval.length > 0) {
+ if (useUniformHash && partitionEval.length > 0) {
hashCode = computeMurmurHash(firstKey);
} else {
- hashCode = computeHashCode(row);
+ hashCode = computeHashCode(row, bucketNumber);
}
firstKey.setHashCode(hashCode);
@@ -391,7 +408,7 @@ public class ReduceSinkOperator extends
// column directly.
Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
- if (isTraceEnabled) {
+ if (isLogTraceEnabled) {
LOG.trace("Acid choosing bucket number " + buckNum);
}
} else {
@@ -438,7 +455,7 @@ public class ReduceSinkOperator extends
return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
}
- private int computeHashCode(Object row) throws HiveException {
+ private int computeHashCode(Object row, int buckNum) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
@@ -462,10 +479,11 @@ public class ReduceSinkOperator extends
+ ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
}
}
- if (isTraceEnabled) {
- LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber));
+ int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
+ if (isLogTraceEnabled) {
+ LOG.trace("Going to return hash code " + hashCode);
}
- return bucketNumber < 0 ? keyHashCode : keyHashCode * 31 + bucketNumber;
+ return hashCode;
}
private boolean partitionKeysAreNull(Object row) throws HiveException {
@@ -506,6 +524,13 @@ public class ReduceSinkOperator extends
// Since this is a terminal operator, update counters explicitly -
// forward is not called
if (null != out) {
+ numRows++;
+ if (isLogInfoEnabled) {
+ if (numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ }
out.collect(keyWritable, valueWritable);
}
}
@@ -535,6 +560,10 @@ public class ReduceSinkOperator extends
}
super.closeOp(abort);
out = null;
+ if (isLogInfoEnabled) {
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ recordCounter.set(numRows);
}
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Oct 30 16:22:33 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Iterator;
/**
* RowSchema Implementation.
@@ -49,6 +50,51 @@ public class RowSchema implements Serial
}
@Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof RowSchema) || (obj == null)) {
+ return false;
+ }
+ if(this == obj) {
+ return true;
+ }
+
+ RowSchema dest = (RowSchema)obj;
+ if(this.signature == null && dest.getSignature() == null) {
+ return true;
+ }
+ if((this.signature == null && dest.getSignature() != null) ||
+ (this.signature != null && dest.getSignature() == null) ) {
+ return false;
+ }
+
+ if(this.signature.size() != dest.getSignature().size()) {
+ return false;
+ }
+
+ Iterator<ColumnInfo> origIt = this.signature.iterator();
+ Iterator<ColumnInfo> destIt = dest.getSignature().iterator();
+ while(origIt.hasNext()) {
+ ColumnInfo origColumn = origIt.next();
+ ColumnInfo destColumn = destIt.next();
+
+ if(origColumn == null && destColumn == null) {
+ continue;
+ }
+
+ if((origColumn == null && destColumn != null) ||
+ (origColumn != null && destColumn == null) ) {
+ return false;
+ }
+
+ if(!origColumn.equals(destColumn)) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append('(');
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Oct 30 16:22:33 2014
@@ -27,8 +27,10 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
+import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.TimeUnit;
@@ -83,6 +85,8 @@ public class ScriptOperator extends Oper
transient Deserializer scriptOutputDeserializer;
transient volatile Throwable scriptError = null;
transient RecordWriter scriptOutWriter = null;
+ // List of conf entries not to turn into env vars
+ transient Set<String> blackListedConfEntries = null;
static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe";
static final String IO_EXCEPTION_STREAM_CLOSED = "Stream closed";
@@ -120,7 +124,8 @@ public class ScriptOperator extends Oper
/**
* Most UNIX implementations impose some limit on the total size of environment variables and
- * size of strings. To fit in this limit we need sometimes to truncate strings.
+ * size of strings. To fit in this limit we need sometimes to truncate strings. Also,
+ * some values tend be long and are meaningless to scripts, so strain them out.
* @param value environment variable value to check
* @param name name of variable (used only for logging purposes)
* @param truncate truncate value or not
@@ -139,6 +144,23 @@ public class ScriptOperator extends Oper
return value;
}
+ boolean blackListed(String name) {
+ if (blackListedConfEntries == null) {
+ blackListedConfEntries = new HashSet<String>();
+ if (hconf != null) {
+ String bl = hconf.get(HiveConf.ConfVars.HIVESCRIPT_ENV_BLACKLIST.toString());
+ if (bl != null && bl.length() > 0) {
+ String[] bls = bl.split(",");
+ for (String b : bls) {
+ b.replaceAll(".", "_");
+ blackListedConfEntries.add(b);
+ }
+ }
+ }
+ }
+ return blackListedConfEntries.contains(name);
+ }
+
/**
* addJobConfToEnvironment is mostly shamelessly copied from hadoop streaming. Added additional
* check on environment variable length
@@ -148,13 +170,16 @@ public class ScriptOperator extends Oper
while (it.hasNext()) {
Map.Entry<String, String> en = it.next();
String name = en.getKey();
- // String value = (String)en.getValue(); // does not apply variable
- // expansion
- String value = conf.get(name); // does variable expansion
- name = safeEnvVarName(name);
- boolean truncate = conf.getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
- value = safeEnvVarValue(value, name, truncate);
- env.put(name, value);
+ if (!blackListed(name)) {
+ // String value = (String)en.getValue(); // does not apply variable
+ // expansion
+ String value = conf.get(name); // does variable expansion
+ name = safeEnvVarName(name);
+ boolean truncate = conf
+ .getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
+ value = safeEnvVarValue(value, name, truncate);
+ env.put(name, value);
+ }
}
}
@@ -238,8 +263,8 @@ public class ScriptOperator extends Oper
protected void initializeOp(Configuration hconf) throws HiveException {
firstRow = true;
- statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
- statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count);
+ statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+ statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count);
try {
this.hconf = hconf;
@@ -285,6 +310,16 @@ public class ScriptOperator extends Oper
return;
}
+ private transient String tableName;
+ private transient String partitionName ;
+
+ @Override
+ public void setInputContext(String inputPath, String tableName, String partitionName) {
+ this.tableName = tableName;
+ this.partitionName = partitionName;
+ super.setInputContext(inputPath, tableName, partitionName);
+ }
+
@Override
public void processOp(Object row, int tag) throws HiveException {
// initialize the user's process only when you receive the first row
@@ -313,10 +348,8 @@ public class ScriptOperator extends Oper
String[] wrappedCmdArgs = addWrapper(cmdArgs);
LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
- LOG.info("tablename="
- + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
- LOG.info("partname="
- + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
+ LOG.info("tablename=" + tableName);
+ LOG.info("partname=" + partitionName);
LOG.info("alias=" + alias);
ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Oct 30 16:22:33 2014
@@ -125,4 +125,31 @@ public class SelectOperator extends Oper
public boolean acceptLimitPushdown() {
return true;
}
+
+ /**
+ * Checks whether this select operator does something to the
+ * input tuples.
+ *
+ * @return if it is an identity select operator or not
+ */
+ public boolean isIdentitySelect() {
+ //Safety check
+ if(this.getNumParent() != 1) {
+ return false;
+ }
+
+ //Select *
+ if(this.getConf().isSelStarNoCompute() ||
+ this.getConf().isSelectStar()) {
+ return true;
+ }
+
+ //Check whether the have the same schema
+ if(!OperatorUtils.sameRowSchema(this, this.getParentOperators().get(0))) {
+ return false;
+ }
+
+ return true;
+ }
+
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Thu Oct 30 16:22:33 2014
@@ -153,7 +153,9 @@ public class StatsNoJobTask extends Task
partn.getInputFormatClass(), jc);
InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0,
new String[] { partn.getLocation() });
- Object recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
+ org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+ (org.apache.hadoop.mapred.RecordReader<?, ?>)
+ inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
StatsProvidingRecordReader statsRR;
if (recordReader instanceof StatsProvidingRecordReader) {
statsRR = (StatsProvidingRecordReader) recordReader;
@@ -163,6 +165,7 @@ public class StatsNoJobTask extends Task
numFiles += 1;
statsAvailable = true;
}
+ recordReader.close();
}
}
@@ -254,6 +257,7 @@ public class StatsNoJobTask extends Task
numFiles += 1;
statsAvailable = true;
}
+ recordReader.close();
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Thu Oct 30 16:22:33 2014
@@ -80,7 +80,7 @@ public class UnionOperator extends Opera
for (int p = 0; p < parents; p++) {
assert (parentFields[p].size() == columns);
for (int c = 0; c < columns; c++) {
- if (!columnTypeResolvers[c].update(parentFields[p].get(c)
+ if (!columnTypeResolvers[c].updateForUnionAll(parentFields[p].get(c)
.getFieldObjectInspector())) {
// checked in SemanticAnalyzer. Should not happen
throw new HiveException("Incompatible types for union operator");
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Oct 30 16:22:33 2014
@@ -416,7 +416,7 @@ public final class Utilities {
}
gWorkMap.put(path, gWork);
} else {
- LOG.debug("Found plan in cache.");
+ LOG.debug("Found plan in cache for name: " + name);
gWork = gWorkMap.get(path);
}
return gWork;
@@ -437,20 +437,20 @@ public final class Utilities {
}
}
- public static Map<String, Map<Integer, String>> getScratchColumnVectorTypes(Configuration hiveConf) {
+ public static Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps(Configuration hiveConf) {
BaseWork baseWork = getMapWork(hiveConf);
if (baseWork == null) {
baseWork = getReduceWork(hiveConf);
}
- return baseWork.getScratchColumnVectorTypes();
+ return baseWork.getAllScratchColumnVectorTypeMaps();
}
- public static Map<String, Map<String, Integer>> getScratchColumnMap(Configuration hiveConf) {
+ public static Map<String, Map<String, Integer>> getAllColumnVectorMaps(Configuration hiveConf) {
BaseWork baseWork = getMapWork(hiveConf);
if (baseWork == null) {
baseWork = getReduceWork(hiveConf);
}
- return baseWork.getScratchColumnMap();
+ return baseWork.getAllColumnVectorMaps();
}
public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
@@ -1635,12 +1635,13 @@ public final class Utilities {
* Group 6: copy [copy keyword]
* Group 8: 2 [copy file index]
*/
+ private static final String COPY_KEYWORD = "_copy_"; // copy keyword
private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX =
Pattern.compile("^.*?"+ // any prefix
"([0-9]+)"+ // taskId
"(_)"+ // separator
"([0-9]{1,6})?"+ // attemptId (limited to 6 digits)
- "((_)(\\Bcopy\\B)(_)"+ // copy keyword
+ "((_)(\\Bcopy\\B)(_)" +
"([0-9]{1,6})$)?"+ // copy file index
"(\\..*)?$"); // any suffix/file extension
@@ -2035,6 +2036,15 @@ public final class Utilities {
return false;
}
+ public static String getBucketFileNameFromPathSubString(String bucketName) {
+ try {
+ return bucketName.split(COPY_KEYWORD)[0];
+ } catch (Exception e) {
+ e.printStackTrace();
+ return bucketName;
+ }
+ }
+
public static String getNameMessage(Exception e) {
return e.getClass().getName() + "(" + e.getMessage() + ")";
}
@@ -2067,15 +2077,21 @@ public final class Utilities {
public static ClassLoader getSessionSpecifiedClassLoader() {
SessionState state = SessionState.get();
if (state == null || state.getConf() == null) {
- LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+ }
return JavaUtils.getClassLoader();
}
ClassLoader sessionCL = state.getConf().getClassLoader();
- if (sessionCL != null){
- LOG.debug("Use session specified class loader");
+ if (sessionCL != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Use session specified class loader");
+ }
return sessionCL;
}
- LOG.debug("Session specified class loader not found, use thread based class loader");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Session specified class loader not found, use thread based class loader");
+ }
return JavaUtils.getClassLoader();
}
@@ -2363,6 +2379,32 @@ public final class Utilities {
}
}
+ /**
+ * Copies the storage handler proeprites configured for a table descriptor to a runtime job
+ * configuration. This differs from {@link #copyTablePropertiesToConf(org.apache.hadoop.hive.ql.plan.TableDesc, org.apache.hadoop.mapred.JobConf)}
+ * in that it does not allow parameters already set in the job to override the values from the
+ * table. This is important for setting the config up for reading,
+ * as the job may already have values in it from another table.
+ * @param tbl
+ * @param job
+ */
+ public static void copyTablePropertiesToConf(TableDesc tbl, JobConf job) {
+ Properties tblProperties = tbl.getProperties();
+ for(String name: tblProperties.stringPropertyNames()) {
+ String val = (String) tblProperties.get(name);
+ if (val != null) {
+ job.set(name, StringEscapeUtils.escapeJava(val));
+ }
+ }
+ Map<String, String> jobProperties = tbl.getJobProperties();
+ if (jobProperties == null) {
+ return;
+ }
+ for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
+ job.set(entry.getKey(), entry.getValue());
+ }
+ }
+
private static final Object INPUT_SUMMARY_LOCK = new Object();
/**
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Oct 30 16:22:33 2014
@@ -56,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Pa
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
@@ -416,6 +418,13 @@ public class ExecDriver extends Task<Map
Utilities.createTmpDirs(job, mWork);
Utilities.createTmpDirs(job, rWork);
+ SessionState ss = SessionState.get();
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+ && ss != null) {
+ TezSessionState session = ss.getTezSession();
+ TezSessionPoolManager.getInstance().close(session, true);
+ }
+
// Finally SUBMIT the JOB!
rj = jc.submitJob(job);
// replace it back
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Thu Oct 30 16:22:33 2014
@@ -26,8 +26,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -72,9 +75,6 @@ public class ExecMapper extends MapReduc
private static boolean done;
// used to log memory usage periodically
- public static MemoryMXBean memoryMXBean;
- private long numRows = 0;
- private long nextCntr = 1;
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
@@ -84,8 +84,6 @@ public class ExecMapper extends MapReduc
public void configure(JobConf job) {
execContext = new ExecMapperContext(job);
// Allocate the bean at the beginning -
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
isLogInfoEnabled = l4j.isInfoEnabled();
@@ -176,15 +174,6 @@ public class ExecMapper extends MapReduc
// Since there is no concept of a group, we don't invoke
// startGroup/endGroup for a mapper
mo.process((Writable)value);
- if (isLogInfoEnabled) {
- numRows++;
- if (numRows == nextCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processing " + numRows
- + " rows: used memory = " + used_memory);
- nextCntr = getNextCntr(numRows);
- }
- }
}
} catch (Throwable e) {
abort = true;
@@ -198,18 +187,6 @@ public class ExecMapper extends MapReduc
}
}
-
- private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
- }
-
@Override
public void close() {
// No row was processed
@@ -245,13 +222,7 @@ public class ExecMapper extends MapReduc
}
}
- if (isLogInfoEnabled) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
- + used_memory);
- }
-
- ReportStats rps = new ReportStats(rp);
+ ReportStats rps = new ReportStats(rp, jc);
mo.preorderMap(rps);
return;
} catch (Exception e) {
@@ -288,17 +259,21 @@ public class ExecMapper extends MapReduc
*/
public static class ReportStats implements Operator.OperatorFunc {
private final Reporter rp;
+ private final Configuration conf;
+ private final String groupName;
- public ReportStats(Reporter rp) {
+ public ReportStats(Reporter rp, Configuration conf) {
this.rp = rp;
+ this.conf = conf;
+ this.groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
}
@Override
public void func(Operator op) {
- Map<Enum<?>, Long> opStats = op.getStats();
- for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
+ Map<String, Long> opStats = op.getStats();
+ for (Map.Entry<String, Long> e : opStats.entrySet()) {
if (rp != null) {
- rp.incrCounter(e.getKey(), e.getValue());
+ rp.incrCounter(groupName, e.getKey(), e.getValue());
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Thu Oct 30 16:22:33 2014
@@ -70,8 +70,6 @@ public class ExecReducer extends MapRedu
private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final String PLAN_KEY = "__REDUCE_PLAN__";
- // used to log memory usage periodically
- private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
// Input value serde needs to be an array to support different SerDe
// for different tags
private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
@@ -86,8 +84,6 @@ public class ExecReducer extends MapRedu
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
- private long cntr = 0;
- private long nextCntr = 1;
private TableDesc keyTableDesc;
private TableDesc[] valueTableDesc;
private ObjectInspector[] rowObjectInspector;
@@ -103,8 +99,6 @@ public class ExecReducer extends MapRedu
ObjectInspector keyObjectInspector;
if (isInfoEnabled) {
- LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
try {
LOG.info("conf classpath = "
+ Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
@@ -245,17 +239,7 @@ public class ExecReducer extends MapRedu
row.clear();
row.add(keyObject);
row.add(valueObject[tag]);
- if (isInfoEnabled) {
- cntr++;
- if (cntr == nextCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- if (isInfoEnabled) {
- LOG.info("ExecReducer: processing " + cntr
- + " rows: used memory = " + used_memory);
- }
- nextCntr = getNextCntr(cntr);
- }
- }
+
try {
reducer.processOp(row, tag);
} catch (Exception e) {
@@ -283,17 +267,6 @@ public class ExecReducer extends MapRedu
}
}
- private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
- }
-
@Override
public void close() {
@@ -310,13 +283,9 @@ public class ExecReducer extends MapRedu
}
reducer.endGroup();
}
- if (isInfoEnabled) {
- LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
- + memoryMXBean.getHeapMemoryUsage().getUsed());
- }
reducer.close(abort);
- ReportStats rps = new ReportStats(rp);
+ ReportStats rps = new ReportStats(rp, jc);
reducer.preorderMap(rps);
} catch (Exception e) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Thu Oct 30 16:22:33 2014
@@ -238,6 +238,18 @@ public class MapredLocalTask extends Tas
variables.put(HADOOP_OPTS_KEY, hadoopOpts);
}
+ //For Windows OS, we need to pass HIVE_HADOOP_CLASSPATH Java parameter while starting
+ //Hiveserver2 using "-hiveconf hive.hadoop.classpath=%HIVE_LIB%". This is to combine path(s).
+ if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)!= null)
+ {
+ if (variables.containsKey("HADOOP_CLASSPATH"))
+ {
+ variables.put("HADOOP_CLASSPATH", variables.get("HADOOP_CLASSPATH") + ";" + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+ } else {
+ variables.put("HADOOP_CLASSPATH", HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+ }
+ }
+
if(variables.containsKey(MapRedTask.HIVE_DEBUG_RECURSIVE)) {
MapRedTask.configureDebugVariablesForChildJVM(variables);
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Thu Oct 30 16:22:33 2014
@@ -21,16 +21,23 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
+import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.DataOutputBuffer;
@@ -38,6 +45,7 @@ import org.apache.hadoop.io.serializer.S
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.TezGroupedSplit;
import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
@@ -67,17 +75,41 @@ import com.google.common.collect.Multima
import com.google.protobuf.ByteString;
/*
- * Only works with old mapred API
- * Will only work with a single MRInput for now.
+ * This is the central piece for Bucket Map Join and SMB join. It has the following
+ * responsibilities:
+ * 1. Group incoming splits based on bucketing.
+ * 2. Generate new serialized events for the grouped splits.
+ * 3. Create a routing table for the bucket map join and send a serialized version as payload
+ * for the EdgeManager.
+ * 4. For SMB join, generate a grouping according to bucketing for the "small" table side.
*/
public class CustomPartitionVertex extends VertexManagerPlugin {
+ public class PathComparatorForSplit implements Comparator<InputSplit> {
+
+ @Override
+ public int compare(InputSplit inp1, InputSplit inp2) {
+ FileSplit fs1 = (FileSplit) inp1;
+ FileSplit fs2 = (FileSplit) inp2;
+
+ int retval = fs1.getPath().compareTo(fs2.getPath());
+ if (retval != 0) {
+ return retval;
+ }
+
+ if (fs1.getStart() != fs2.getStart()) {
+ return (int) (fs1.getStart() - fs2.getStart());
+ }
+
+ return 0;
+ }
+ }
+
private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
VertexManagerPluginContext context;
private InputConfigureVertexTasksEvent configureVertexTaskEvent;
- private List<InputDataInformationEvent> dataInformationEvents;
private int numBuckets = -1;
private Configuration conf = null;
private final SplitGrouper grouper = new SplitGrouper();
@@ -89,6 +121,13 @@ public class CustomPartitionVertex exten
private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
new HashMap<String, Multimap<Integer, InputSplit>>();
+ private int numInputsAffectingRootInputSpecUpdate = 1;
+ private int numInputsSeenSoFar = 0;
+ private final Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
+ private final List<InputSplit> finalSplits = Lists.newLinkedList();
+ private final Map<String, InputSpecUpdate> inputNameInputSpecMap =
+ new HashMap<String, InputSpecUpdate>();
+
public CustomPartitionVertex(VertexManagerPluginContext context) {
super(context);
}
@@ -108,12 +147,13 @@ public class CustomPartitionVertex exten
this.numBuckets = vertexConf.getNumBuckets();
this.mainWorkName = vertexConf.getInputName();
this.vertexType = vertexConf.getVertexType();
+ this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs();
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
int numTasks = context.getVertexNumTasks(context.getVertexName());
- List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
+ List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
for (int i = 0; i < numTasks; ++i) {
scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
@@ -133,8 +173,8 @@ public class CustomPartitionVertex exten
@Override
public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
List<Event> events) {
+ numInputsSeenSoFar++;
LOG.info("On root vertex initialized " + inputName);
-
try {
// This is using the payload from the RootVertexInitializer corresponding
// to InputName. Ideally it should be using it's own configuration class -
@@ -168,20 +208,21 @@ public class CustomPartitionVertex exten
}
boolean dataInformationEventSeen = false;
- Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
+ Map<String, Set<FileSplit>> pathFileSplitsMap = new TreeMap<String, Set<FileSplit>>();
for (Event event : events) {
if (event instanceof InputConfigureVertexTasksEvent) {
// No tasks should have been started yet. Checked by initial state
// check.
+ LOG.info("Got a input configure vertex event for input: " + inputName);
Preconditions.checkState(dataInformationEventSeen == false);
InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
// The vertex cannot be configured until all DataEvents are seen - to
// build the routing table.
configureVertexTaskEvent = cEvent;
- dataInformationEvents =
- Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
+ LOG.info("Configure task for input name: " + inputName + " num tasks: "
+ + configureVertexTaskEvent.getNumTasks());
}
if (event instanceof InputUpdatePayloadEvent) {
// this event can never occur. If it does, fail.
@@ -189,22 +230,26 @@ public class CustomPartitionVertex exten
} else if (event instanceof InputDataInformationEvent) {
dataInformationEventSeen = true;
InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
- dataInformationEvents.add(diEvent);
FileSplit fileSplit;
try {
fileSplit = getFileSplitFromEvent(diEvent);
} catch (IOException e) {
throw new RuntimeException("Failed to get file split for event: " + diEvent);
}
- List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName());
+ Set<FileSplit> fsList =
+ pathFileSplitsMap.get(Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath()
+ .getName()));
if (fsList == null) {
- fsList = new ArrayList<FileSplit>();
- pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
+ fsList = new TreeSet<FileSplit>(new PathComparatorForSplit());
+ pathFileSplitsMap.put(
+ Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath().getName()), fsList);
}
fsList.add(fileSplit);
}
}
+ LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap);
+
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
getBucketSplitMapForPath(pathFileSplitsMap);
@@ -217,77 +262,144 @@ public class CustomPartitionVertex exten
int availableSlots = totalResource / taskResource;
- LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves.");
+ LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves
+ + " waves. Bucket initial splits map: " + bucketToInitialSplitMap);
JobConf jobConf = new JobConf(conf);
ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
HashMultimap.<Integer, InputSplit> create();
- for (Integer key : bucketToInitialSplitMap.keySet()) {
- InputSplit[] inputSplitArray =
- (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
- Multimap<Integer, InputSplit> groupedSplit =
- HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
- availableSlots, inputName);
- bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
- }
-
- LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
- if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+ boolean secondLevelGroupingDone = false;
+ if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) {
+ for (Integer key : bucketToInitialSplitMap.keySet()) {
+ InputSplit[] inputSplitArray =
+ (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+ HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+ Multimap<Integer, InputSplit> groupedSplit =
+ hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+ availableSlots, inputName, mainWorkName.isEmpty());
+ if (mainWorkName.isEmpty() == false) {
+ Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
+ HashMultimap.<Integer, InputSplit> create();
+ singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
+ groupedSplit =
+ grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
+ HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
+ secondLevelGroupingDone = true;
+ }
+ bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+ }
+ processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone);
+ } else {
+ // do not group across files in case of side work because there is only 1 KV reader per
+ // grouped split. This would affect SMB joins where we want to find the smallest key in
+ // all the bucket files.
+ for (Integer key : bucketToInitialSplitMap.keySet()) {
+ HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+ InputSplit[] inputSplitArray =
+ (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+ Multimap<Integer, InputSplit> groupedSplit =
+ hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+ availableSlots, inputName, false);
+ bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+ }
/*
- * this is the small table side. In case of SMB join, we may need to send each split to the
+ * this is the small table side. In case of SMB join, we need to send each split to the
* corresponding bucket-based task on the other side. In case a split needs to go to
* multiple downstream tasks, we need to clone the event and send it to the right
* destination.
*/
- processAllSideEvents(inputName, bucketToGroupedSplitMap);
- } else {
- processAllEvents(inputName, bucketToGroupedSplitMap);
+ LOG.info("This is the side work - multi-mr work.");
+ processAllSideEventsSetParallelism(inputName, bucketToGroupedSplitMap);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- private void processAllSideEvents(String inputName,
+ private void processAllSideEventsSetParallelism(String inputName,
Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
// the bucket to task map should have been setup by the big table.
+ LOG.info("Processing events for input " + inputName);
if (bucketToTaskMap.isEmpty()) {
+ LOG.info("We don't have a routing table yet. Will need to wait for the main input"
+ + " initialization");
inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
return;
}
+ processAllSideEvents(inputName, bucketToGroupedSplitMap);
+ setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+ }
+
+ private void processAllSideEvents(String inputName,
+ Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+ LOG.info("We have a routing table and we are going to set the destination tasks for the"
+ + " multi mr inputs. " + bucketToTaskMap);
+
+ Integer[] numSplitsForTask = new Integer[taskCount];
+
+ Multimap<Integer, ByteBuffer> bucketToSerializedSplitMap = LinkedListMultimap.create();
+
+ // Create the list of serialized splits for each bucket.
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+ for (InputSplit split : entry.getValue()) {
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+ ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer();
+ bucketToSerializedSplitMap.put(entry.getKey(), bs);
+ }
+ }
+
+ for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
for (Integer task : destTasks) {
- for (InputSplit split : entry.getValue()) {
- MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+ int count = 0;
+ for (ByteBuffer buf : entry.getValue()) {
+ count++;
InputDataInformationEvent diEvent =
- InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
- .toByteString().asReadOnlyByteBuffer());
+ InputDataInformationEvent.createWithSerializedPayload(count, buf);
diEvent.setTargetIndex(task);
taskEvents.add(diEvent);
}
+ numSplitsForTask[task] = count;
}
}
+ inputNameInputSpecMap.put(inputName,
+ InputSpecUpdate.createPerTaskInputSpecUpdate(Arrays.asList(numSplitsForTask)));
+
+ LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
+
context.addRootInputEvents(inputName, taskEvents);
}
private void processAllEvents(String inputName,
- Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+ Multimap<Integer, InputSplit> bucketToGroupedSplitMap, boolean secondLevelGroupingDone)
+ throws IOException {
- List<InputSplit> finalSplits = Lists.newLinkedList();
+ int totalInputsCount = 0;
+ List<Integer> numSplitsForTask = new ArrayList<Integer>();
for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
int bucketNum = entry.getKey();
Collection<InputSplit> initialSplits = entry.getValue();
finalSplits.addAll(initialSplits);
- for (int i = 0; i < initialSplits.size(); i++) {
+ for (InputSplit inputSplit : initialSplits) {
bucketToTaskMap.put(bucketNum, taskCount);
+ if (secondLevelGroupingDone) {
+ TezGroupedSplit groupedSplit = (TezGroupedSplit) inputSplit;
+ numSplitsForTask.add(groupedSplit.getGroupedSplits().size());
+ totalInputsCount += groupedSplit.getGroupedSplits().size();
+ } else {
+ numSplitsForTask.add(1);
+ totalInputsCount += 1;
+ }
taskCount++;
}
}
+ inputNameInputSpecMap.put(inputName,
+ InputSpecUpdate.createPerTaskInputSpecUpdate(numSplitsForTask));
+
// Construct the EdgeManager descriptor to be used by all edges which need
// the routing table.
EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
@@ -297,7 +409,6 @@ public class CustomPartitionVertex exten
UserPayload payload = getBytePayload(bucketToTaskMap);
hiveEdgeManagerDesc.setUserPayload(payload);
}
- Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
// Replace the edge manager for all vertices which have routing type custom.
for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -308,42 +419,67 @@ public class CustomPartitionVertex exten
}
}
- LOG.info("Task count is " + taskCount);
+ LOG.info("Task count is " + taskCount + " for input name: " + inputName);
- List<InputDataInformationEvent> taskEvents =
- Lists.newArrayListWithCapacity(finalSplits.size());
+ List<InputDataInformationEvent> taskEvents = Lists.newArrayListWithCapacity(totalInputsCount);
// Re-serialize the splits after grouping.
int count = 0;
for (InputSplit inputSplit : finalSplits) {
- MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
- InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
- count, serializedSplit.toByteString().asReadOnlyByteBuffer());
- diEvent.setTargetIndex(count);
+ if (secondLevelGroupingDone) {
+ TezGroupedSplit tezGroupedSplit = (TezGroupedSplit)inputSplit;
+ for (InputSplit subSplit : tezGroupedSplit.getGroupedSplits()) {
+ if ((subSplit instanceof TezGroupedSplit) == false) {
+ throw new IOException("Unexpected split type found: "
+ + subSplit.getClass().getCanonicalName());
+ }
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(subSplit);
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+ .toByteString().asReadOnlyByteBuffer());
+ diEvent.setTargetIndex(count);
+ taskEvents.add(diEvent);
+ }
+ } else {
+ MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+ InputDataInformationEvent diEvent =
+ InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+ .toByteString().asReadOnlyByteBuffer());
+ diEvent.setTargetIndex(count);
+ taskEvents.add(diEvent);
+ }
count++;
- taskEvents.add(diEvent);
- }
-
- // Replace the Edge Managers
- Map<String, InputSpecUpdate> rootInputSpecUpdate =
- new HashMap<String, InputSpecUpdate>();
- rootInputSpecUpdate.put(
- inputName,
- InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
- if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
- context.setVertexParallelism(
- taskCount,
- VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
- .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
}
// Set the actual events for the tasks.
+ LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
context.addRootInputEvents(inputName, taskEvents);
if (inputToGroupedSplitMap.isEmpty() == false) {
for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
processAllSideEvents(entry.getKey(), entry.getValue());
}
+ setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
inputToGroupedSplitMap.clear();
}
+
+ // Only done when it is a bucket map join only no SMB.
+ if (numInputsAffectingRootInputSpecUpdate == 1) {
+ setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+ }
+ }
+
+ private void
+ setVertexParallelismAndRootInputSpec(Map<String, InputSpecUpdate> rootInputSpecUpdate)
+ throws IOException {
+ if (numInputsAffectingRootInputSpecUpdate != numInputsSeenSoFar) {
+ return;
+ }
+
+ LOG.info("Setting vertex parallelism since we have seen all inputs.");
+
+ context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
+ .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap,
+ rootInputSpecUpdate);
+ finalSplits.clear();
}
UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -377,14 +513,14 @@ public class CustomPartitionVertex exten
* This method generates the map of bucket to file splits.
*/
private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
- Map<String, List<FileSplit>> pathFileSplitsMap) {
+ Map<String, Set<FileSplit>> pathFileSplitsMap) {
int bucketNum = 0;
Multimap<Integer, InputSplit> bucketToInitialSplitMap =
ArrayListMultimap.<Integer, InputSplit> create();
- for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+ for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
int bucketId = bucketNum % numBuckets;
for (FileSplit fsplit : entry.getValue()) {
bucketToInitialSplitMap.put(bucketId, fsplit);
@@ -392,6 +528,11 @@ public class CustomPartitionVertex exten
bucketNum++;
}
+ // this is just for SMB join use-case. The numBuckets would be equal to that of the big table
+ // and the small table could have lesser number of buckets. In this case, we want to send the
+ // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small
+ // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as
+ // well.
if (bucketNum < numBuckets) {
int loopedBucketId = 0;
for (; bucketNum < numBuckets; bucketNum++) {
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Thu Oct 30 16:22:33 2014
@@ -29,20 +29,31 @@ import org.apache.hadoop.io.Writable;
* This class is the payload for custom vertex. It serializes and de-serializes
* @numBuckets: the number of buckets of the "big table"
* @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins
- * @inputName: This is the name of the input. Used in case of SMB joins
+ * @numInputs: The number of inputs that are directly connected to the vertex (MRInput/MultiMRInput).
+ * In case of bucket map join, it is always 1.
+ * @inputName: This is the name of the input. Used in case of SMB joins. Empty in case of BucketMapJoin
*/
public class CustomVertexConfiguration implements Writable {
private int numBuckets;
private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES;
+ private int numInputs;
private String inputName;
public CustomVertexConfiguration() {
}
- public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) {
+ // this is the constructor to use for the Bucket map join case.
+ public CustomVertexConfiguration(int numBuckets, VertexType vertexType) {
+ this(numBuckets, vertexType, "", 1);
+ }
+
+ // this is the constructor to use for SMB.
+ public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName,
+ int numInputs) {
this.numBuckets = numBuckets;
this.vertexType = vertexType;
+ this.numInputs = numInputs;
this.inputName = inputName;
}
@@ -50,6 +61,7 @@ public class CustomVertexConfiguration i
public void write(DataOutput out) throws IOException {
out.writeInt(this.vertexType.ordinal());
out.writeInt(this.numBuckets);
+ out.writeInt(numInputs);
out.writeUTF(inputName);
}
@@ -57,6 +69,7 @@ public class CustomVertexConfiguration i
public void readFields(DataInput in) throws IOException {
this.vertexType = VertexType.values()[in.readInt()];
this.numBuckets = in.readInt();
+ this.numInputs = in.readInt();
this.inputName = in.readUTF();
}
@@ -71,4 +84,8 @@ public class CustomVertexConfiguration i
public String getInputName() {
return inputName;
}
+
+ public int getNumInputs() {
+ return numInputs;
+ }
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Oct 30 16:22:33 2014
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
import com.google.common.base.Function;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
import javax.security.auth.login.LoginException;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -49,7 +47,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -110,16 +108,13 @@ import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.output.MROutput;
import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -178,6 +173,8 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) {
JobConf conf = new JobConf(baseConf);
+ conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName());
+
if (mapWork.getNumMapTasks() != null) {
// Is this required ?
conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
@@ -268,8 +265,7 @@ public class DagUtils {
case CUSTOM_EDGE: {
mergeInputClass = ConcatenatedMergedKeyValueInput.class;
int numBuckets = edgeProp.getNumBuckets();
- CustomVertexConfiguration vertexConf =
- new CustomVertexConfiguration(numBuckets, vertexType, "");
+ CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
DataOutputBuffer dob = new DataOutputBuffer();
vertexConf.write(dob);
VertexManagerPluginDescriptor desc =
@@ -314,8 +310,7 @@ public class DagUtils {
switch(edgeProp.getEdgeType()) {
case CUSTOM_EDGE: {
int numBuckets = edgeProp.getNumBuckets();
- CustomVertexConfiguration vertexConf =
- new CustomVertexConfiguration(numBuckets, vertexType, "");
+ CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
DataOutputBuffer dob = new DataOutputBuffer();
vertexConf.write(dob);
VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
@@ -340,7 +335,6 @@ public class DagUtils {
/*
* Helper function to create an edge property from an edge type.
*/
- @SuppressWarnings("rawtypes")
private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
throws IOException {
MRHelpers.translateMRConfToTez(conf);
@@ -431,8 +425,9 @@ public class DagUtils {
int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
- int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
- MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+ int cpus = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) > 0 ?
+ HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) :
+ conf.getInt(MRJobConfig.MAP_CPU_VCORES, MRJobConfig.DEFAULT_MAP_CPU_VCORES);
return Resource.newInstance(memory, cpus);
}
@@ -452,17 +447,29 @@ public class DagUtils {
*/
private String getContainerJavaOpts(Configuration conf) {
String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
- if (javaOpts != null && !javaOpts.isEmpty()) {
- String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
- List<String> logProps = Lists.newArrayList();
- TezUtils.addLog4jSystemProperties(logLevel, logProps);
- StringBuilder sb = new StringBuilder();
- for (String str : logProps) {
- sb.append(str).append(" ");
+
+ String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
+ List<String> logProps = Lists.newArrayList();
+ TezUtils.addLog4jSystemProperties(logLevel, logProps);
+ StringBuilder sb = new StringBuilder();
+ for (String str : logProps) {
+ sb.append(str).append(" ");
+ }
+ logLevel = sb.toString();
+
+ if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0) {
+ if (javaOpts != null) {
+ return javaOpts + " " + logLevel;
+ } else {
+ return logLevel;
+ }
+ } else {
+ if (javaOpts != null && !javaOpts.isEmpty()) {
+ LOG.warn(HiveConf.ConfVars.HIVETEZJAVAOPTS + " will be ignored because "
+ + HiveConf.ConfVars.HIVETEZCONTAINERSIZE + " is not set!");
}
- return javaOpts + " " + sb.toString();
+ return logLevel + " " + MRHelpers.getJavaOptsForMRMapper(conf);
}
- return MRHelpers.getJavaOptsForMRMapper(conf);
}
private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
@@ -473,13 +480,9 @@ public class DagUtils {
if (mergeJoinWork.getMainWork() instanceof MapWork) {
List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
- CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
Vertex mergeVx =
createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
- // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
- // here would cause pre-mature grouping which would be incorrect.
- Class inputFormatClass = HiveInputFormat.class;
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
// mapreduce.tez.input.initializer.serialize.event.payload should be set
// to false when using this plug-in to avoid getting a serialized event at run-time.
@@ -496,9 +499,11 @@ public class DagUtils {
VertexManagerPluginDescriptor desc =
VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+ // the +1 to the size is because of the main work.
CustomVertexConfiguration vertexConf =
new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
- .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+ .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(),
+ mapWorkList.size() + 1);
DataOutputBuffer dob = new DataOutputBuffer();
vertexConf.write(dob);
byte[] userPayload = dob.getData();
@@ -538,6 +543,7 @@ public class DagUtils {
DataSourceDescriptor dataSource;
int numTasks = -1;
+ @SuppressWarnings("rawtypes")
Class inputFormatClass = conf.getClass("mapred.input.format.class",
InputFormat.class);
@@ -595,7 +601,13 @@ public class DagUtils {
.setCustomInitializerDescriptor(descriptor).build();
} else {
// Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
- dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+ if (vertexHasCustomInput) {
+ dataSource =
+ MultiMRInput.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+ } else {
+ dataSource =
+ MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+ }
}
} else {
// Setup client side split generation.
@@ -640,6 +652,8 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
JobConf conf = new JobConf(baseConf);
+ conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName());
+
// Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
@@ -745,6 +759,7 @@ public class DagUtils {
* @throws LoginException if we are unable to figure user information
* @throws IOException when any dfs operation fails.
*/
+ @SuppressWarnings("deprecation")
public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
@@ -857,6 +872,7 @@ public class DagUtils {
return fstatus;
}
+ @SuppressWarnings("deprecation")
public static FileStatus validateTargetDir(Path path, Configuration conf) throws IOException {
FileSystem fs = path.getFileSystem(conf);
FileStatus fstatus = null;
@@ -971,7 +987,7 @@ public class DagUtils {
public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
hiveConf.setBoolean("mapred.mapper.new-api", false);
- JobConf conf = new JobConf(hiveConf);
+ JobConf conf = new JobConf(new TezConfiguration(hiveConf));
conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
@@ -1033,6 +1049,7 @@ public class DagUtils {
* @param ctx This query's context
* @return Vertex
*/
+ @SuppressWarnings("deprecation")
public Vertex createVertex(JobConf conf, BaseWork work,
Path scratchDir, LocalResource appJarLr,
List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Thu Oct 30 16:22:33 2014
@@ -87,6 +87,8 @@ public class DynamicPartitionPruner {
private final Object endOfEvents = new Object();
+ private int totalEventCount = 0;
+
public DynamicPartitionPruner() {
}
@@ -114,7 +116,7 @@ public class DynamicPartitionPruner {
// synchronous event processing loop. Won't return until all events have
// been processed.
this.processEvents();
- this.prunePartitions(work);
+ this.prunePartitions(work, context);
LOG.info("Ok to proceed.");
}
@@ -163,12 +165,22 @@ public class DynamicPartitionPruner {
}
}
- private void prunePartitions(MapWork work) throws HiveException {
+ private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException {
+ int expectedEvents = 0;
for (String source : this.sourceInfoMap.keySet()) {
for (SourceInfo si : this.sourceInfoMap.get(source)) {
+ int taskNum = context.getVertexNumTasks(source);
+ LOG.info("Expecting " + taskNum + " events for vertex " + source);
+ expectedEvents += taskNum;
prunePartitionSingleSource(source, si, work);
}
}
+
+ // sanity check. all tasks must submit events for us to succeed.
+ if (expectedEvents != totalEventCount) {
+ LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+ throw new HiveException("Incorrect event count in dynamic parition pruning");
+ }
}
private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work)
@@ -396,7 +408,8 @@ public class DynamicPartitionPruner {
public void addEvent(InputInitializerEvent event) {
synchronized(sourcesWaitingForEvents) {
if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
- queue.offer(event);
+ ++totalEventCount;
+ queue.offer(event);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Thu Oct 30 16:22:33 2014
@@ -18,6 +18,8 @@
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,7 +66,6 @@ import com.google.common.collect.Multima
* making sure that splits from different partitions are only grouped if they
* are of the same schema, format and serde
*/
-@SuppressWarnings("deprecation")
public class HiveSplitGenerator extends InputInitializer {
private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
@@ -72,11 +73,17 @@ public class HiveSplitGenerator extends
private static final SplitGrouper grouper = new SplitGrouper();
private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
private InputInitializerContext context;
+ private static Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
+ new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
public HiveSplitGenerator(InputInitializerContext initializerContext) {
super(initializerContext);
}
+ public HiveSplitGenerator() {
+ this(null);
+ }
+
@Override
public List<Event> initialize() throws Exception {
InputInitializerContext rootInputContext = getContext();
@@ -150,58 +157,28 @@ public class HiveSplitGenerator extends
}
- public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
Configuration conf, InputSplit[] splits, float waves, int availableSlots)
throws Exception {
- return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+ return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
}
- public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
- Configuration conf, InputSplit[] splits, float waves, int availableSlots,
- String inputName) throws Exception {
-
- MapWork work = null;
- if (inputName != null) {
- work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
- // work can still be null if there is no merge work for this input
- }
- if (work == null) {
- work = Utilities.getMapWork(jobConf);
- }
+ public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+ Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName,
+ boolean groupAcrossFiles) throws Exception {
+ MapWork work = populateMapWork(jobConf, inputName);
Multimap<Integer, InputSplit> bucketSplitMultiMap =
ArrayListMultimap.<Integer, InputSplit> create();
- Class<?> previousInputFormatClass = null;
- String previousDeserializerClass = null;
- Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
- new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
-
int i = 0;
-
+ InputSplit prevSplit = null;
for (InputSplit s : splits) {
// this is the bit where we make sure we don't group across partition
// schema boundaries
-
- Path path = ((FileSplit) s).getPath();
-
- PartitionDesc pd =
- HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
- path, cache);
-
- String currentDeserializerClass = pd.getDeserializerClassName();
- Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
-
- if ((currentInputFormatClass != previousInputFormatClass)
- || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+ if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
++i;
- }
-
- previousInputFormatClass = currentInputFormatClass;
- previousDeserializerClass = currentDeserializerClass;
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Adding split " + path + " to src group " + i);
+ prevSplit = s;
}
bucketSplitMultiMap.put(i, s);
}
@@ -214,6 +191,54 @@ public class HiveSplitGenerator extends
return groupedSplits;
}
+ private MapWork populateMapWork(JobConf jobConf, String inputName) {
+ MapWork work = null;
+ if (inputName != null) {
+ work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+ // work can still be null if there is no merge work for this input
+ }
+ if (work == null) {
+ work = Utilities.getMapWork(jobConf);
+ }
+
+ return work;
+ }
+
+ public boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles,
+ MapWork work) throws IOException {
+ boolean retval = false;
+ Path path = ((FileSplit) s).getPath();
+ PartitionDesc pd =
+ HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+ path, cache);
+ String currentDeserializerClass = pd.getDeserializerClassName();
+ Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
+
+ Class<?> previousInputFormatClass = null;
+ String previousDeserializerClass = null;
+ if (prevSplit != null) {
+ Path prevPath = ((FileSplit) prevSplit).getPath();
+ if (!groupAcrossFiles) {
+ return !path.equals(prevPath);
+ }
+ PartitionDesc prevPD =
+ HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+ prevPath, cache);
+ previousDeserializerClass = prevPD.getDeserializerClassName();
+ previousInputFormatClass = prevPD.getInputFileFormatClass();
+ }
+
+ if ((currentInputFormatClass != previousInputFormatClass)
+ || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+ retval = true;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding split " + path + " to src new group? " + retval);
+ }
+ return retval;
+ }
+
private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Thu Oct 30 16:22:33 2014
@@ -17,10 +17,10 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -44,15 +44,15 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.mapred.JobConf;
import org.apache.tez.mapreduce.input.MRInputLegacy;
import org.apache.tez.mapreduce.input.MultiMRInput;
import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.Input;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.ProcessorContext;
@@ -73,6 +73,7 @@ public class MapRecordProcessor extends
private int position = 0;
private boolean foundCachedMergeWork = false;
MRInputLegacy legacyMRInput = null;
+ MultiMRInput mainWorkMultiMRInput = null;
private ExecMapperContext execContext = null;
private boolean abort = false;
protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
@@ -129,12 +130,14 @@ public class MapRecordProcessor extends
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
super.init(jconf, processorContext, mrReporter, inputs, outputs);
- //Update JobConf using MRInput, info like filename comes via this
+ // Update JobConf using MRInput, info like filename comes via this
legacyMRInput = getMRInput(inputs);
- Configuration updatedConf = legacyMRInput.getConfigUpdates();
- if (updatedConf != null) {
- for (Entry<String, String> entry : updatedConf) {
- jconf.set(entry.getKey(), entry.getValue());
+ if (legacyMRInput != null) {
+ Configuration updatedConf = legacyMRInput.getConfigUpdates();
+ if (updatedConf != null) {
+ for (Entry<String, String> entry : updatedConf) {
+ jconf.set(entry.getKey(), entry.getValue());
+ }
}
}
@@ -158,8 +161,6 @@ public class MapRecordProcessor extends
if (mergeWorkList != null) {
MapOperator mergeMapOp = null;
for (MapWork mergeMapWork : mergeWorkList) {
- processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
- .get(mergeMapWork.getName()))));
if (mergeMapWork.getVectorMode()) {
mergeMapOp = new VectorMapOperator();
} else {
@@ -235,11 +236,17 @@ public class MapRecordProcessor extends
}
private void initializeMapRecordSources() throws Exception {
+
int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
sources = new MapRecordSource[size];
- KeyValueReader reader = legacyMRInput.getReader();
position = mapOp.getConf().getTag();
sources[position] = new MapRecordSource();
+ KeyValueReader reader = null;
+ if (mainWorkMultiMRInput != null) {
+ reader = getKeyValueReader(mainWorkMultiMRInput.getKeyValueReaders(), mapOp);
+ } else {
+ reader = legacyMRInput.getReader();
+ }
sources[position].init(jconf, mapOp, reader);
for (MapOperator mapOp : mergeMapOpList) {
int tag = mapOp.getConf().getTag();
@@ -248,13 +255,28 @@ public class MapRecordProcessor extends
MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
- List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
- reader = new KeyValueInputMerger(kvReaderList);
+ reader = getKeyValueReader(kvReaders, mapOp);
sources[tag].init(jconf, mapOp, reader);
}
((TezContext) MapredContext.get()).setRecordSources(sources);
}
+ @SuppressWarnings("deprecation")
+ private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders,
+ MapOperator mapOp)
+ throws Exception {
+ List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders);
+ // this sets up the map operator contexts correctly
+ mapOp.initializeContexts();
+ Deserializer deserializer = mapOp.getCurrentDeserializer();
+ KeyValueReader reader =
+ new KeyValueInputMerger(kvReaderList, deserializer,
+ new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp
+ .getConf()
+ .getSortCols());
+ return reader;
+ }
+
private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
@@ -269,11 +291,7 @@ public class MapRecordProcessor extends
@Override
void run() throws Exception {
- while (sources[position].pushRecord()) {
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
+ while (sources[position].pushRecord()) {}
}
@Override
@@ -305,10 +323,7 @@ public class MapRecordProcessor extends
}
}
- if (isLogInfoEnabled) {
- logCloseInfo();
- }
- ReportStats rps = new ReportStats(reporter);
+ ReportStats rps = new ReportStats(reporter, jconf);
mapOp.preorderMap(rps);
return;
} catch (Exception e) {
@@ -342,7 +357,17 @@ public class MapRecordProcessor extends
multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
}
}
- theMRInput.init();
+ if (theMRInput != null) {
+ theMRInput.init();
+ } else {
+ String alias = mapWork.getAliasToWork().keySet().iterator().next();
+ if (inputs.get(alias) instanceof MultiMRInput) {
+ mainWorkMultiMRInput = (MultiMRInput) inputs.get(alias);
+ } else {
+ throw new IOException("Unexpected input type found: "
+ + inputs.get(alias).getClass().getCanonicalName());
+ }
+ }
return theMRInput;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Thu Oct 30 16:22:33 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.mapreduce.input.MRInput;
import org.apache.tez.runtime.library.api.KeyValueReader;
/**
@@ -45,7 +43,7 @@ public class MapRecordSource implements
private final boolean grouped = false;
void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException {
- execContext = new ExecMapperContext(jconf);
+ execContext = mapOp.getExecContext();
this.mapOp = mapOp;
this.reader = reader;
}