You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/12 19:23:14 UTC
svn commit: r1541190 [4/15] - in /hive/branches/tez: ./
ant/src/org/apache/hadoop/hive/ant/ beeline/
beeline/src/java/org/apache/hive/beeline/ cli/
cli/src/java/org/apache/hadoop/hive/cli/ common/
common/src/java/org/apache/hadoop/hive/common/ common/s...
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov 12 18:23:05 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
+import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -46,6 +47,7 @@ import org.apache.hadoop.io.BinaryCompar
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
/**
* Reduce Sink Operator sends output to the reduce stage.
@@ -54,6 +56,7 @@ public class ReduceSinkOperator extends
implements Serializable, TopNHash.BinaryCollector {
private static final long serialVersionUID = 1L;
+ protected transient OutputCollector out;
/**
* The evaluators for the key columns. Key columns decide the sort order on
@@ -91,6 +94,10 @@ public class ReduceSinkOperator extends
return inputAlias;
}
+ public void setOutputCollector(OutputCollector _out) {
+ this.out = _out;
+ }
+
// picks topN K:V pairs from input.
protected transient TopNHash reducerHash = new TopNHash();
@Override
@@ -153,8 +160,8 @@ public class ReduceSinkOperator extends
transient InspectableObject tempInspectableObject = new InspectableObject();
protected transient HiveKey keyWritable = new HiveKey();
- transient StructObjectInspector keyObjectInspector;
- transient StructObjectInspector valueObjectInspector;
+ protected transient ObjectInspector keyObjectInspector;
+ protected transient ObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
protected transient Object[] cachedValues;
@@ -173,6 +180,7 @@ public class ReduceSinkOperator extends
* in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
* see {@link ExprNodeColumnEvaluator}
*/
+ // TODO: we only ever use one row of these at a time. Why do we need to cache multiple?
protected transient Object[][] cachedKeys;
boolean firstRow;
protected transient Random random;
@@ -237,51 +245,41 @@ public class ReduceSinkOperator extends
.getOutputValueColumnNames(), rowInspector);
partitionObjectInspectors = initEvaluators(partitionEval, rowInspector);
int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
- int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
- numDistributionKeys;
+ int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 : numDistributionKeys;
cachedKeys = new Object[numKeys][keyLen];
cachedValues = new Object[valueEval.length];
}
- // Evaluate the keys
- for (int i = 0; i < numDistributionKeys; i++) {
- cachedKeys[0][i] = keyEval[i].evaluate(row);
- }
+ // Determine distKeyLength (w/o distincts), and then add the first if present.
+ populateCachedDistributionKeys(row, 0);
+ HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+ int distKeyLength = firstKey.getDistKeyLength();
if (numDistinctExprs > 0) {
- // with distinct key(s)
- for (int i = 0; i < numDistinctExprs; i++) {
- if (i > 0) {
- System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
- }
- StandardUnion union = (StandardUnion) cachedKeys[i][numDistributionKeys];
- if (union == null) {
- cachedKeys[i][numDistributionKeys] =
- union = new StandardUnion((byte)i, new Object[distinctColIndices.get(i).size()]);
- }
- Object[] distinctParameters = (Object[]) union.getObject();
- for (int j = 0; j < distinctParameters.length; j++) {
- distinctParameters[j] =
- keyEval[distinctColIndices.get(i).get(j)].evaluate(row);
- }
- union.setTag((byte) i);
- }
+ populateCachedDistinctKeys(row, 0);
+ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
- for (int i = 0; i < cachedKeys.length; i++) {
- // Serialize the keys and append the tag
- Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
- setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
- int topNIndex = reducerHash.tryStoreKey(keyWritable);
- if (TopNHash.EXCLUDED == topNIndex) continue;
- int keyHashCode = computeHashCode(row);
- BytesWritable valueWritable = getValue(row);
- if (TopNHash.FORWARD == topNIndex) {
- keyWritable.setHashCode(keyHashCode);
- collect(keyWritable, valueWritable);
- continue;
- }
- assert topNIndex >= 0;
- reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
+ // Try to store the first key. If it's not excluded, we will proceed.
+ int firstIndex = reducerHash.tryStoreKey(firstKey);
+ if (firstIndex == TopNHash.EXCLUDE) return; // Nothing to do.
+ // Compute value and hashcode - we'd either store or forward them.
+ BytesWritable value = makeValueWritable(row);
+ int hashCode = computeHashCode(row);
+ if (firstIndex == TopNHash.FORWARD) {
+ firstKey.setHashCode(hashCode);
+ collect(firstKey, value);
+ } else {
+ assert firstIndex >= 0;
+ reducerHash.storeValue(firstIndex, value, hashCode, false);
+ }
+
+ // All other distinct keys will just be forwarded. This could be optimized...
+ for (int i = 1; i < numDistinctExprs; i++) {
+ System.arraycopy(cachedKeys[0], 0, cachedKeys[i], 0, numDistributionKeys);
+ populateCachedDistinctKeys(row, i);
+ HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+ hiveKey.setHashCode(hashCode);
+ collect(hiveKey, value);
}
} catch (HiveException e) {
throw e;
@@ -290,14 +288,38 @@ public class ReduceSinkOperator extends
}
}
+ private void populateCachedDistributionKeys(Object row, int index) throws HiveException {
+ for (int i = 0; i < numDistributionKeys; i++) {
+ cachedKeys[index][i] = keyEval[i].evaluate(row);
+ }
+ if (cachedKeys[0].length > numDistributionKeys) {
+ cachedKeys[index][numDistributionKeys] = null;
+ }
+ }
+
+ /**
+ * Populate distinct keys part of cachedKeys for a particular row.
+ * @param row the row
+ * @param index the cachedKeys index to write to
+ */
+ private void populateCachedDistinctKeys(Object row, int index) throws HiveException {
+ StandardUnion union;
+ cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+ (byte)index, new Object[distinctColIndices.get(index).size()]);
+ Object[] distinctParameters = (Object[]) union.getObject();
+ for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+ distinctParameters[distinctParamI] =
+ keyEval[distinctColIndices.get(index).get(distinctParamI)].evaluate(row);
+ }
+ union.setTag((byte) index);
+ }
+
private int computeHashCode(Object row) throws HiveException {
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
+ // If no partition cols, just distribute the data uniformly to provide better
+ // load balance. If the requirement is to have a single reducer, we should set
// the number of reducers to 1.
// Use a constant seed to make the code deterministic.
if (random == null) {
@@ -314,15 +336,19 @@ public class ReduceSinkOperator extends
return keyHashCode;
}
- protected void setKeyWritable(BinaryComparable key, int tag) {
+ // Serialize the keys and append the tag
+ protected HiveKey toHiveKey(Object obj, int tag, Integer distLength) throws SerDeException {
+ BinaryComparable key = (BinaryComparable)keySerializer.serialize(obj, keyObjectInspector);
+ int keyLength = key.getLength();
if (tag == -1) {
- keyWritable.set(key.getBytes(), 0, key.getLength());
+ keyWritable.set(key.getBytes(), 0, keyLength);
} else {
- int keyLength = key.getLength();
keyWritable.setSize(keyLength + 1);
System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
keyWritable.get()[keyLength] = tagByte[0];
}
+ keyWritable.setDistKeyLength((distLength == null) ? keyLength : distLength);
+ return keyWritable;
}
public void collect(byte[] key, byte[] value, int hash) throws IOException {
@@ -331,11 +357,6 @@ public class ReduceSinkOperator extends
collect(keyWritable, valueWritable);
}
- protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
- HiveKey keyWritable = new HiveKey(key, hash);
- collect(keyWritable, valueWritable);
- }
-
protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
// Since this is a terminal operator, update counters explicitly -
// forward is not called
@@ -351,7 +372,7 @@ public class ReduceSinkOperator extends
}
}
- private BytesWritable getValue(Object row) throws Exception {
+ private BytesWritable makeValueWritable(Object row) throws Exception {
// Evaluate the value
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
@@ -366,6 +387,7 @@ public class ReduceSinkOperator extends
reducerHash.flush();
}
super.closeOp(abort);
+ out = null;
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov 12 18:23:05 2013
@@ -66,9 +66,6 @@ public class StatsTask extends Task<Stat
private Table table;
private List<LinkedHashMap<String, String>> dpPartSpecs;
- private static final List<String> collectableStats = StatsSetupConst.getStatsToBeCollected();
- private static final List<String> supportedStats = StatsSetupConst.getSupportedStats();
-
public StatsTask() {
super();
dpPartSpecs = null;
@@ -84,7 +81,7 @@ public class StatsTask extends Task<Stat
public Statistics() {
stats = new HashMap<String, LongWritable>();
- for (String statType : supportedStats) {
+ for (String statType : StatsSetupConst.supportedStats) {
stats.put(statType, new LongWritable(0L));
}
}
@@ -108,7 +105,7 @@ public class StatsTask extends Task<Stat
@Override
public String toString() {
- return org.apache.commons.lang.StringUtils.join(supportedStats, ", ");
+ return org.apache.commons.lang.StringUtils.join(StatsSetupConst.supportedStats, ", ");
}
}
@@ -207,7 +204,7 @@ public class StatsTask extends Task<Stat
boolean tableStatsExist = this.existStats(parameters);
- for (String statType : supportedStats) {
+ for (String statType : StatsSetupConst.supportedStats) {
if (parameters.containsKey(statType)) {
tblStats.setStat(statType, Long.parseLong(parameters.get(statType)));
}
@@ -226,14 +223,14 @@ public class StatsTask extends Task<Stat
// In case of a non-partitioned table, the key for stats temporary store is "rootDir"
if (statsAggregator != null) {
String aggKey = Utilities.getHashedStatsPrefix(work.getAggKey(), maxPrefixLength);
- updateStats(collectableStats, tblStats, statsAggregator, parameters,
+ updateStats(StatsSetupConst.statsRequireCompute, tblStats, statsAggregator, parameters,
aggKey, atomic);
statsAggregator.cleanUp(aggKey);
}
// The collectable stats for the aggregator needs to be cleared.
// For eg. if a file is being loaded, the old number of rows are not valid
else if (work.isClearAggregatorStats()) {
- for (String statType : collectableStats) {
+ for (String statType : StatsSetupConst.statsRequireCompute) {
if (parameters.containsKey(statType)) {
tblStats.setStat(statType, 0L);
}
@@ -242,9 +239,10 @@ public class StatsTask extends Task<Stat
// write table stats to metastore
parameters = tTable.getParameters();
- for (String statType : collectableStats) {
+ for (String statType : StatsSetupConst.statsRequireCompute) {
parameters.put(statType, Long.toString(tblStats.getStat(statType)));
}
+ parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
tTable.setParameters(parameters);
String tableFullName = table.getDbName() + "." + table.getTableName();
@@ -269,7 +267,7 @@ public class StatsTask extends Task<Stat
}
Map<String, Long> currentValues = new HashMap<String, Long>();
- for (String statType : supportedStats) {
+ for (String statType : StatsSetupConst.supportedStats) {
Long val = parameters.containsKey(statType) ? Long.parseLong(parameters.get(statType))
: 0L;
currentValues.put(statType, val);
@@ -288,11 +286,11 @@ public class StatsTask extends Task<Stat
LOG.info("Stats aggregator : " + partitionID);
if (statsAggregator != null) {
- updateStats(collectableStats, newPartStats, statsAggregator,
+ updateStats(StatsSetupConst.statsRequireCompute, newPartStats, statsAggregator,
parameters, partitionID, atomic);
statsAggregator.cleanUp(partitionID);
} else {
- for (String statType : collectableStats) {
+ for (String statType : StatsSetupConst.statsRequireCompute) {
// The collectable stats for the aggregator needs to be cleared.
// For eg. if a file is being loaded, the old number of rows are not valid
if (work.isClearAggregatorStats()) {
@@ -320,13 +318,14 @@ public class StatsTask extends Task<Stat
//
// update the metastore
//
- for (String statType : supportedStats) {
+ for (String statType : StatsSetupConst.supportedStats) {
long statValue = newPartStats.getStat(statType);
if (statValue >= 0) {
parameters.put(statType, Long.toString(newPartStats.getStat(statType)));
}
}
+ parameters.put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, StatsSetupConst.TRUE);
tPart.setParameters(parameters);
String tableFullName = table.getDbName() + "." + table.getTableName();
db.alterPartition(tableFullName, new Partition(table, tPart));
@@ -364,7 +363,7 @@ public class StatsTask extends Task<Stat
|| parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
}
- private void updateStats(List<String> statsList, Statistics stats,
+ private void updateStats(String[] statsList, Statistics stats,
StatsAggregator statsAggregator, Map<String, String> parameters,
String aggKey, boolean atomic) throws HiveException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Tue Nov 12 18:23:05 2013
@@ -93,6 +93,8 @@ public abstract class Task<T extends Ser
// Bean methods
+ protected boolean rootTask;
+
protected List<Task<? extends Serializable>> childTasks;
protected List<Task<? extends Serializable>> parentTasks;
/**
@@ -172,6 +174,14 @@ public abstract class Task<T extends Ser
return false;
}
+ public boolean isRootTask() {
+ return rootTask;
+ }
+
+ public void setRootTask(boolean rootTask) {
+ this.rootTask = rootTask;
+ }
+
public void setChildTasks(List<Task<? extends Serializable>> childTasks) {
this.childTasks = childTasks;
}
@@ -506,4 +516,8 @@ public abstract class Task<T extends Ser
void setException(Throwable ex) {
exception = ex;
}
+
+ public String toString() {
+ return getId() + ":" + getType();
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Tue Nov 12 18:23:05 2013
@@ -31,6 +31,8 @@ import com.google.common.collect.MinMaxP
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
@@ -51,11 +53,9 @@ public class TopNHash {
public void collect(byte[] key, byte[] value, int hash) throws IOException;
}
- public static final int FORWARD = -1;
- public static final int EXCLUDED = -2;
- private static final int FLUSH = -3;
- private static final int DISABLE = -4;
- private static final int MAY_FORWARD = -5;
+ public static final int FORWARD = -1; // Forward the row to reducer as is.
+ public static final int EXCLUDE = -2; // Discard the row.
+ private static final int MAY_FORWARD = -3; // Vectorized - may forward the row, not sure yet.
private BinaryCollector collector;
private int topN;
@@ -67,14 +67,17 @@ public class TopNHash {
private byte[][] keys;
private byte[][] values;
private int[] hashes;
+ private int[] distKeyLengths;
private IndexStore indexes; // The heap over the keys, storing indexes in the array.
private int evicted; // recently evicted index (used for next key/value)
private int excluded; // count of excluded rows from previous flush
- // temporary stuff used for vectorization
+ // temporary single-batch context used for vectorization
private int batchNumForwards = 0; // whether current batch has any forwarded keys
private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
+ private int[] batchIndexToResult; // mapping of index in the batch (linear) to hash result
+ private int batchSize; // Size of the current batch.
private boolean isEnabled = false;
@@ -82,7 +85,9 @@ public class TopNHash {
public int compare(Integer o1, Integer o2) {
byte[] key1 = keys[o1];
byte[] key2 = keys[o2];
- return WritableComparator.compareBytes(key1, 0, key1.length, key2, 0, key2.length);
+ int length1 = distKeyLengths[o1];
+ int length2 = distKeyLengths[o2];
+ return WritableComparator.compareBytes(key1, 0, length1, key2, 0, length2);
}
};
@@ -107,6 +112,7 @@ public class TopNHash {
this.keys = new byte[topN + 1][];
this.values = new byte[topN + 1][];
this.hashes = new int[topN + 1];
+ this.distKeyLengths = new int[topN + 1];
this.evicted = topN;
this.isEnabled = true;
}
@@ -118,12 +124,12 @@ public class TopNHash {
* TopNHash.EXCLUDED if the row should be discarded;
* any other number if the row is to be stored; the index should be passed to storeValue.
*/
- public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+ public int tryStoreKey(HiveKey key) throws HiveException, IOException {
if (!isEnabled) {
return FORWARD; // short-circuit quickly - forward all rows
}
if (topN == 0) {
- return EXCLUDED; // short-circuit quickly - eat all rows
+ return EXCLUDE; // short-circuit quickly - eat all rows
}
int index = insertKeyIntoHeap(key);
if (index >= 0) {
@@ -132,21 +138,8 @@ public class TopNHash {
}
// IndexStore is trying to tell us something.
switch (index) {
- case DISABLE: {
- LOG.info("Top-N hash is disabled");
- flushInternal();
- isEnabled = false;
- return FORWARD;
- }
- case FLUSH: {
- LOG.info("Top-N hash is flushed");
- flushInternal();
- // we can now retry adding key/value into hash, which is flushed.
- // but for simplicity, just forward them
- return FORWARD;
- }
case FORWARD: return FORWARD;
- case EXCLUDED: return EXCLUDED; // skip the row.
+ case EXCLUDE: return EXCLUDE; // skip the row.
default: {
assert false;
throw new HiveException("Invalid result trying to store the key: " + index);
@@ -157,15 +150,16 @@ public class TopNHash {
/**
* Perform basic checks and initialize TopNHash for the new vectorized row batch.
+ * @param size batch size
* @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
* TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
* any other result means the batch has been started.
*/
- public int startVectorizedBatch() throws IOException, HiveException {
+ public int startVectorizedBatch(int size) throws IOException, HiveException {
if (!isEnabled) {
return FORWARD; // short-circuit quickly - forward all rows
} else if (topN == 0) {
- return EXCLUDED; // short-circuit quickly - eat all rows
+ return EXCLUDE; // short-circuit quickly - eat all rows
}
// Flush here if the memory usage is too high. After that, we have the entire
// batch already in memory anyway so we will bypass the memory checks.
@@ -179,8 +173,13 @@ public class TopNHash {
return FORWARD; // Hash is ineffective, disable.
}
}
+ // Started ok; initialize context for new batch.
+ batchSize = size;
+ if (batchIndexToResult == null || batchIndexToResult.length < batchSize) {
+ batchIndexToResult = new int[Math.max(batchSize, VectorizedRowBatch.DEFAULT_SIZE)];
+ }
if (indexToBatchIndex == null) {
- indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+ indexToBatchIndex = new int[topN + 1];
}
Arrays.fill(indexToBatchIndex, -1);
batchNumForwards = 0;
@@ -191,33 +190,28 @@ public class TopNHash {
* Try to put the key from the current vectorized batch into the heap.
* @param key the key.
* @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
- * @param results The results; the number of elements equivalent to vrg.size, by kindex.
- * The result should be the same across the calls for the batch; in then end, for each k-index:
- * - TopNHash.EXCLUDED - discard the row.
- * - positive index - store the row using storeValue, same as tryStoreRow.
- * - negative index - forward the row. getVectorizedKeyToForward called w/this index will
- * return the key to use so it doesn't have to be rebuilt.
*/
- public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
- throws HiveException, IOException {
+ public void tryStoreVectorizedKey(HiveKey key, int batchIndex)
+ throws HiveException, IOException {
// Assumption - batchIndex is increasing; startVectorizedBatch was called
int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ distKeyLengths[index] = key.getDistKeyLength();
Integer collisionIndex = indexes.store(index);
if (null != collisionIndex) {
// forward conditional on the survival of the corresponding key currently in indexes.
++batchNumForwards;
- results[batchIndex] = MAY_FORWARD - collisionIndex;
+ batchIndexToResult[batchIndex] = MAY_FORWARD - collisionIndex;
return;
}
indexToBatchIndex[index] = batchIndex;
- results[batchIndex] = index;
+ batchIndexToResult[batchIndex] = index;
if (size != topN) return;
evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
- results[batchIndex] = EXCLUDED;
+ batchIndexToResult[batchIndex] = EXCLUDE;
indexToBatchIndex[index] = -1;
return; // input key is bigger than any of keys in hash
}
@@ -225,36 +219,54 @@ public class TopNHash {
int evictedBatchIndex = indexToBatchIndex[evicted];
if (evictedBatchIndex >= 0) {
// reset the result for the evicted index
- results[evictedBatchIndex] = EXCLUDED;
+ batchIndexToResult[evictedBatchIndex] = EXCLUDE;
indexToBatchIndex[evicted] = -1;
}
- // Also evict all results grouped with this index; cannot be current key or before it.
- if (batchNumForwards > 0) {
- int evictedForward = (MAY_FORWARD - evicted);
- boolean forwardRemoved = false;
- for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
- if (results[i] == evictedForward) {
- results[i] = EXCLUDED;
- forwardRemoved = true;
- }
- }
- if (forwardRemoved) {
+ // Evict all results grouped with this index; it cannot be any key further in the batch.
+ // If we evict a key from this batch, the keys grouped with it cannot be earlier that that key.
+ // If we evict a key that is not from this batch, initial i = (-1) + 1 = 0, as intended.
+ int evictedForward = (MAY_FORWARD - evicted);
+ for (int i = evictedBatchIndex + 1; i < batchIndex && (batchNumForwards > 0); ++i) {
+ if (batchIndexToResult[i] == evictedForward) {
+ batchIndexToResult[i] = EXCLUDE;
--batchNumForwards;
}
}
}
/**
+ * Get vectorized batch result for particular index.
+ * @param batchIndex index of the key in the batch.
+ * @return the result, same as from {@link #tryStoreKey(HiveKey)}
+ */
+ public int getVectorizedBatchResult(int batchIndex) {
+ int result = batchIndexToResult[batchIndex];
+ return (result <= MAY_FORWARD) ? FORWARD : result;
+ }
+
+ /**
* After vectorized batch is processed, can return the key that caused a particular row
* to be forwarded. Because the row could only be marked to forward because it has
* the same key with some row already in the heap (for GBY), we can use that key from the
* heap to emit the forwarded row.
- * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
- * @return The key corresponding to the row.
+ * @param batchIndex index of the key in the batch.
+ * @return The key corresponding to the index.
*/
- public byte[] getVectorizedKeyToForward(int index) {
- assert index <= MAY_FORWARD;
- return keys[MAY_FORWARD - index];
+ public HiveKey getVectorizedKeyToForward(int batchIndex) {
+ int index = MAY_FORWARD - batchIndexToResult[batchIndex];
+ HiveKey hk = new HiveKey();
+ hk.set(keys[index], 0, keys[index].length);
+ hk.setDistKeyLength(distKeyLengths[index]);
+ return hk;
+ }
+
+ /**
+ * After vectorized batch is processed, can return distribution keys length of a key.
+ * @param batchIndex index of the key in the batch.
+ * @return The distribution length corresponding to the key.
+ */
+ public int getVectorizedKeyDistLength(int batchIndex) {
+ return distKeyLengths[batchIndexToResult[batchIndex]];
}
/**
@@ -289,16 +301,22 @@ public class TopNHash {
* <p/>
* -1 for FORWARD : should be forwarded to output collector (for GBY)
* -2 for EXCLUDED : not in top-k. ignore it
- * -3 for FLUSH : memory is not enough. flush values (keep keys only)
- * -4 for DISABLE : hash is not effective. flush and disable it
*/
- private int insertKeyIntoHeap(BinaryComparable key) {
+ private int insertKeyIntoHeap(HiveKey key) throws IOException, HiveException {
if (usage > threshold) {
- return excluded == 0 ? DISABLE : FLUSH;
+ flushInternal();
+ if (excluded == 0) {
+ LOG.info("Top-N hash is disabled");
+ isEnabled = false;
+ }
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ return FORWARD;
}
int size = indexes.size();
int index = size < topN ? size : evicted;
keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+ distKeyLengths[index] = key.getDistKeyLength();
if (null != indexes.store(index)) {
// it's only for GBY which should forward all values associated with the key in the range
// of limit. new value should be attatched with the key but in current implementation,
@@ -310,7 +328,7 @@ public class TopNHash {
evicted = indexes.removeBiggest(); // remove the biggest key
if (index == evicted) {
excluded++;
- return EXCLUDED; // input key is bigger than any of keys in hash
+ return EXCLUDE; // input key is bigger than any of keys in hash
}
removed(evicted);
}
@@ -326,6 +344,7 @@ public class TopNHash {
values[index] = null;
}
hashes[index] = -1;
+ distKeyLengths[index] = -1;
}
private void flushInternal() throws IOException, HiveException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UDTFOperator.java Tue Nov 12 18:23:05 2013
@@ -44,7 +44,7 @@ public class UDTFOperator extends Operat
protected final Log LOG = LogFactory.getLog(this.getClass().getName());
- ObjectInspector[] udtfInputOIs = null;
+ StructObjectInspector udtfInputOI = null;
Object[] objToSendToUDTF = null;
GenericUDTF genericUDTF;
@@ -63,22 +63,16 @@ public class UDTFOperator extends Operat
genericUDTF.setCollector(collector);
- // Make an object inspector [] of the arguments to the UDTF
- List<? extends StructField> inputFields =
- ((StructObjectInspector) inputObjInspectors[0]).getAllStructFieldRefs();
-
- udtfInputOIs = new ObjectInspector[inputFields.size()];
- for (int i = 0; i < inputFields.size(); i++) {
- udtfInputOIs[i] = inputFields.get(i).getFieldObjectInspector();
- }
- objToSendToUDTF = new Object[inputFields.size()];
+ udtfInputOI = (StructObjectInspector) inputObjInspectors[0];
+
+ objToSendToUDTF = new Object[udtfInputOI.getAllStructFieldRefs().size()];
MapredContext context = MapredContext.get();
if (context != null) {
context.setup(genericUDTF);
}
- StructObjectInspector udtfOutputOI = genericUDTF.initialize(
- udtfInputOIs);
+ StructObjectInspector udtfOutputOI = genericUDTF.initialize(udtfInputOI);
+
if (conf.isOuterLV()) {
outerObj = Arrays.asList(new Object[udtfOutputOI.getAllStructFieldRefs().size()]);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov 12 18:23:05 2013
@@ -100,6 +100,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.metastore.Warehouse;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
@@ -126,6 +127,9 @@ import org.apache.hadoop.hive.ql.io.rcfi
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.HiveUtils;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -2031,7 +2035,7 @@ public final class Utilities {
}
}
- public static Object INPUT_SUMMARY_LOCK = new Object();
+ private static final Object INPUT_SUMMARY_LOCK = new Object();
/**
* Calculate the total size of input files.
@@ -2114,26 +2118,47 @@ public final class Utilities {
// is not correct.
final Configuration myConf = conf;
final JobConf myJobConf = jobConf;
+ final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
+ final Map<String, ArrayList<String>> pathToAlias = work.getPathToAliases();
final PartitionDesc partDesc = work.getPathToPartitionInfo().get(
p.toString());
Runnable r = new Runnable() {
public void run() {
try {
- ContentSummary resultCs;
-
Class<? extends InputFormat> inputFormatCls = partDesc
.getInputFileFormatClass();
InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
inputFormatCls, myJobConf);
if (inputFormatObj instanceof ContentSummaryInputFormat) {
- resultCs = ((ContentSummaryInputFormat) inputFormatObj).getContentSummary(p,
- myJobConf);
- } else {
+ ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj;
+ resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
+ return;
+ }
+ HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
+ partDesc.getOverlayedProperties().getProperty(
+ hive_metastoreConstants.META_TABLE_STORAGE));
+ if (handler == null) {
+ // native table
FileSystem fs = p.getFileSystem(myConf);
- resultCs = fs.getContentSummary(p);
+ resultMap.put(pathStr, fs.getContentSummary(p));
+ return;
+ }
+ if (handler instanceof InputEstimator) {
+ long total = 0;
+ TableDesc tableDesc = partDesc.getTableDesc();
+ InputEstimator estimator = (InputEstimator) handler;
+ for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
+ JobConf jobConf = new JobConf(myJobConf);
+ TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias);
+ Utilities.setColumnNameList(jobConf, scanOp, true);
+ Utilities.setColumnTypeList(jobConf, scanOp, true);
+ PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
+ Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
+ total += estimator.estimate(myJobConf, scanOp, -1).getTotalLength();
+ }
+ resultMap.put(pathStr, new ContentSummary(total, -1, -1));
}
- resultMap.put(pathStr, resultCs);
- } catch (IOException e) {
+ } catch (Exception e) {
// We safely ignore this exception for summary data.
// We don't update the cache to protect it from polluting other
// usages. The worst case is that IOException will always be
@@ -2340,12 +2365,19 @@ public final class Utilities {
}
public static void setColumnNameList(JobConf jobConf, Operator op) {
+ setColumnNameList(jobConf, op, false);
+ }
+
+ public static void setColumnNameList(JobConf jobConf, Operator op, boolean excludeVCs) {
RowSchema rowSchema = op.getSchema();
if (rowSchema == null) {
return;
}
StringBuilder columnNames = new StringBuilder();
for (ColumnInfo colInfo : rowSchema.getSignature()) {
+ if (excludeVCs && colInfo.getIsVirtualCol()) {
+ continue;
+ }
if (columnNames.length() > 0) {
columnNames.append(",");
}
@@ -2356,12 +2388,19 @@ public final class Utilities {
}
public static void setColumnTypeList(JobConf jobConf, Operator op) {
+ setColumnTypeList(jobConf, op, false);
+ }
+
+ public static void setColumnTypeList(JobConf jobConf, Operator op, boolean excludeVCs) {
RowSchema rowSchema = op.getSchema();
if (rowSchema == null) {
return;
}
StringBuilder columnTypes = new StringBuilder();
for (ColumnInfo colInfo : rowSchema.getSignature()) {
+ if (excludeVCs && colInfo.getIsVirtualCol()) {
+ continue;
+ }
if (columnTypes.length() > 0) {
columnTypes.append(",");
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov 12 18:23:05 2013
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.exec.Fe
import org.apache.hadoop.hive.ql.exec.HiveTotalOrderPartitioner;
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.PartitionKeySampler;
import org.apache.hadoop.hive.ql.exec.TableScanOperator;
import org.apache.hadoop.hive.ql.exec.Task;
@@ -81,6 +82,7 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.log4j.Appender;
@@ -128,7 +130,6 @@ public class ExecDriver extends Task<Map
private void initializeFiles(String prop, String files) {
if (files != null && files.length() > 0) {
job.set(prop, files);
- ShimLoader.getHadoopShims().setTmpFiles(prop, files);
}
}
@@ -543,7 +544,7 @@ public class ExecDriver extends Task<Map
FetchOperator fetcher = PartitionKeySampler.createSampler(fetchWork, conf, job, ts);
try {
ts.initialize(conf, new ObjectInspector[]{fetcher.getOutputObjectInspector()});
- ts.setOutputCollector(sampler);
+ OperatorUtils.setChildrenCollector(ts.getChildOperators(), sampler);
while (fetcher.pushRow()) { }
} finally {
fetcher.clearFetchContext();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Tue Nov 12 18:23:05 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -158,7 +159,7 @@ public class ExecMapper extends MapReduc
if (oc == null) {
oc = output;
rp = reporter;
- mo.setOutputCollector(oc);
+ OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
mo.setReporter(rp);
MapredContext.get().setReporter(reporter);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Tue Nov 12 18:23:05 2013
@@ -188,7 +188,6 @@ public class ExecReducer extends MapRedu
// propagate reporter and output collector to all operators
oc = output;
rp = reporter;
- reducer.setOutputCollector(oc);
reducer.setReporter(rp);
MapredContext.get().setReporter(reporter);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/HadoopJobExecHelper.java Tue Nov 12 18:23:05 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapred.Counters
import org.apache.hadoop.mapred.Counters.Counter;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
@@ -240,7 +241,7 @@ public class HadoopJobExecHelper {
} catch (InterruptedException e) {
}
- if (initializing && ShimLoader.getHadoopShims().isJobPreparing(rj)) {
+ if (initializing && rj.getJobState() == JobStatus.PREP) {
// No reason to poll untill the job is initialized
continue;
} else {
@@ -590,12 +591,6 @@ public class HadoopJobExecHelper {
List<Integer> reducersRunTimes = new ArrayList<Integer>();
for (TaskCompletionEvent taskCompletion : taskCompletions) {
- String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(taskCompletion);
- if (taskJobIds == null) {
- // Task attempt info is unavailable in this Hadoop version");
- continue;
- }
- String taskId = taskJobIds[0];
if (!taskCompletion.isMapTask()) {
reducersRunTimes.add(new Integer(taskCompletion.getTaskRunTime()));
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/JobDebugger.java Tue Nov 12 18:23:05 2013
@@ -157,20 +157,10 @@ public class JobDebugger implements Runn
boolean more = true;
boolean firstError = true;
for (TaskCompletionEvent t : taskCompletions) {
- // getTaskJobIDs returns Strings for compatibility with Hadoop versions
- // without TaskID or TaskAttemptID
- String[] taskJobIds = ShimLoader.getHadoopShims().getTaskJobIDs(t);
-
- if (taskJobIds == null) {
- console.printError("Task attempt info is unavailable in this Hadoop version");
- more = false;
- break;
- }
-
// For each task completion event, get the associated task id, job id
// and the logs
- String taskId = taskJobIds[0];
- String jobId = taskJobIds[1];
+ String taskId = t.getTaskAttemptId().getTaskID().toString();
+ String jobId = t.getTaskAttemptId().getJobID().toString();
if (firstError) {
console.printError("Examining task ID: " + taskId + " (and more) from job " + jobId);
firstError = false;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapRedTask.java Tue Nov 12 18:23:05 2013
@@ -178,12 +178,7 @@ public class MapRedTask extends ExecDriv
String isSilent = "true".equalsIgnoreCase(System
.getProperty("test.silent")) ? "-nolog" : "";
- String jarCmd;
- if (ShimLoader.getHadoopShims().usesJobShell()) {
- jarCmd = libJarsOption + hiveJar + " " + ExecDriver.class.getName();
- } else {
- jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
- }
+ String jarCmd = hiveJar + " " + ExecDriver.class.getName() + libJarsOption;
String cmdLine = hadoopExec + " jar " + jarCmd + " -plan "
+ planPath.toString() + " " + isSilent + " " + hiveConfArgs;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Tue Nov 12 18:23:05 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
@@ -123,7 +124,7 @@ public class MapRecordProcessor extends
}
}
- mapOp.setOutputCollector(out);
+ OperatorUtils.setChildrenCollector(mapOp.getChildOperators(), out);
mapOp.setReporter(reporter);
MapredContext.get().setReporter(reporter);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Nov 12 18:23:05 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.t
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -30,6 +31,7 @@ import org.apache.hadoop.hive.ql.exec.Ma
import org.apache.hadoop.hive.ql.exec.ObjectCache;
import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.reportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
@@ -154,6 +156,17 @@ public class ReduceRecordProcessor exte
}
}
+ // set output collector for any reduce sink operators in the pipeline.
+ List<Operator<? extends OperatorDesc>> children = new LinkedList<Operator<? extends OperatorDesc>>();
+ children.add(reducer);
+ if (dummyOps != null) {
+ children.addAll(dummyOps);
+ }
+ OperatorUtils.setChildrenCollector(children, out);
+
+ reducer.setReporter(reporter);
+ MapredContext.get().setReporter(reporter);
+
} catch (Throwable e) {
abort = true;
if (e instanceof OutOfMemoryError) {
@@ -164,10 +177,6 @@ public class ReduceRecordProcessor exte
}
}
- reducer.setOutputCollector(out);
- reducer.setReporter(reporter);
- MapredContext.get().setReporter(reporter);
-
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Tue Nov 12 18:23:05 2013
@@ -30,6 +30,7 @@ import org.apache.hadoop.hive.ql.exec.To
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
+import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -39,9 +40,11 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+// import org.apache.hadoop.util.StringUtils;
public class VectorReduceSinkOperator extends ReduceSinkOperator {
@@ -87,12 +90,6 @@ public class VectorReduceSinkOperator ex
*/
private transient VectorExpressionWriter[] partitionWriters;
- private transient ObjectInspector keyObjectInspector;
- private transient ObjectInspector valueObjectInspector;
- private transient int [] keyHashCode = new int [VectorizedRowBatch.DEFAULT_SIZE];
-
- private transient int[] hashResult; // the pre-created array for reducerHash results
-
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
this();
@@ -110,7 +107,6 @@ public class VectorReduceSinkOperator ex
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
-
numDistributionKeys = conf.getNumDistributionKeys();
distinctColIndices = conf.getDistinctColumnIndices();
numDistinctExprs = distinctColIndices.size();
@@ -183,7 +179,7 @@ public class VectorReduceSinkOperator ex
numDistributionKeys;
cachedKeys = new Object[numKeys][keyLen];
cachedValues = new Object[valueEval.length];
-
+
int tag = conf.getTag();
tagByte[0] = (byte) tag;
LOG.info("Using tag = " + tag);
@@ -209,81 +205,84 @@ public class VectorReduceSinkOperator ex
partitionEval.length));
try {
-
- for (int i = 0; i < partitionEval.length; i++) {
- partitionEval[i].evaluate(vrg);
- }
-
- // run the vector evaluations
- for (int i = 0; i < valueEval.length; i++) {
- valueEval[i].evaluate(vrg);
- }
// Evaluate the keys
for (int i = 0; i < keyEval.length; i++) {
keyEval[i].evaluate(vrg);
}
- Object[] distributionKeys = new Object[numDistributionKeys];
-
// Determine which rows we need to emit based on topN optimization
- int startResult = reducerHash.startVectorizedBatch();
- if (startResult == TopNHash.EXCLUDED) {
+ int startResult = reducerHash.startVectorizedBatch(vrg.size);
+ if (startResult == TopNHash.EXCLUDE) {
return; // TopN wants us to exclude all rows.
}
- boolean useTopN = startResult != TopNHash.FORWARD;
- if (useTopN && (hashResult == null || hashResult.length < vrg.size)) {
- hashResult = new int[Math.max(vrg.size, VectorizedRowBatch.DEFAULT_SIZE)];
+ // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
+ for (int i = 0; i < partitionEval.length; i++) {
+ partitionEval[i].evaluate(vrg);
+ }
+ // run the vector evaluations
+ for (int i = 0; i < valueEval.length; i++) {
+ valueEval[i].evaluate(vrg);
}
- for (int j = 0 ; j < vrg.size; ++j) {
- int rowIndex = j;
+ boolean useTopN = startResult != TopNHash.FORWARD;
+ // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
+ // If we are using topN, we will make the first key for each row and store/forward it.
+ // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ int rowIndex = batchIndex;
if (vrg.selectedInUse) {
- rowIndex = vrg.selected[j];
+ rowIndex = vrg.selected[batchIndex];
}
- // First, evaluate the key - the way things stand we'd need it regardless.
- for (int i = 0; i < keyEval.length; i++) {
- int batchColumn = keyEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- distributionKeys[i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+ // First, make distrib key components for this row and determine distKeyLength.
+ populatedCachedDistributionKeys(vrg, rowIndex, 0);
+ HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
+ int distKeyLength = firstKey.getDistKeyLength();
+ // Add first distinct expression, if any.
+ if (numDistinctExprs > 0) {
+ populateCachedDistinctKeys(vrg, rowIndex, 0);
+ firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
}
- // no distinct key
- System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
- // TopN is not supported for multi-distinct currently. If we have more cachedKeys
- // than one for every input key horrible things will happen (OOB error on array likely).
- assert !useTopN || cachedKeys.length <= 1;
- for (int i = 0; i < cachedKeys.length; i++) {
- // Serialize the keys and append the tag.
- Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
- setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
- if (useTopN) {
- reducerHash.tryStoreVectorizedKey(keyWritable, j, hashResult);
- } else {
- // No TopN, just forward the key
- keyWritable.setHashCode(computeHashCode(vrg, rowIndex));
- collect(keyWritable, makeValueWritable(vrg, rowIndex));
- }
+
+ if (useTopN) {
+ reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
+ } else {
+ // No TopN, just forward the first key and all others.
+ int hashCode = computeHashCode(vrg, rowIndex);
+ firstKey.setHashCode(hashCode);
+ BytesWritable value = makeValueWritable(vrg, rowIndex);
+ collect(firstKey, value);
+ forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
}
}
if (!useTopN) return; // All done.
// If we use topN, we have called tryStore on every key now. We can process the results.
- for (int j = 0 ; j < vrg.size; ++j) {
- int index = hashResult[j];
- if (index == TopNHash.EXCLUDED) continue;
- int rowIndex = j;
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ int result = reducerHash.getVectorizedBatchResult(batchIndex);
+ if (result == TopNHash.EXCLUDE) continue;
+ int rowIndex = batchIndex;
if (vrg.selectedInUse) {
- rowIndex = vrg.selected[j];
+ rowIndex = vrg.selected[batchIndex];
}
- // Compute everything now - we'd either store it, or forward it.
+ // Compute value and hashcode - we'd either store or forward them.
int hashCode = computeHashCode(vrg, rowIndex);
BytesWritable value = makeValueWritable(vrg, rowIndex);
- if (index < 0) {
- // Kinda hacky; see getVectorizedKeyToForward javadoc.
- byte[] key = reducerHash.getVectorizedKeyToForward(index);
- collect(key, value, hashCode);
+ int distKeyLength = -1;
+ if (result == TopNHash.FORWARD) {
+ HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
+ firstKey.setHashCode(hashCode);
+ distKeyLength = firstKey.getDistKeyLength();
+ collect(firstKey, value);
} else {
- reducerHash.storeValue(index, value, hashCode, true);
+ reducerHash.storeValue(result, value, hashCode, true);
+ distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
+ }
+ // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
+ // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
+ if (numDistinctExprs > 1) {
+ populatedCachedDistributionKeys(vrg, rowIndex, 1);
+ forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
}
}
} catch (SerDeException e) {
@@ -293,6 +292,74 @@ public class VectorReduceSinkOperator ex
}
}
+ /**
+ * This function creates and forwards all the additional KVs for the multi-distinct case,
+ * after the first (0th) KV pertaining to the row has already been stored or forwarded.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param hashCode the partitioning hash code to use; same as for the first KV
+ * @param value the value to use; same as for the first KV
+ * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
+ * @param tag the tag
+ * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
+ */
+ private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
+ BytesWritable value, int distKeyLength, int tag, int baseIndex)
+ throws HiveException, SerDeException, IOException {
+ // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
+ // the first key has already been stored. There's few bytes difference between keys
+ // for different distincts, and the value/etc. are all the same.
+ // We could store deltas to re-gen extra rows when flushing TopN.
+ for (int i = 1; i < numDistinctExprs; i++) {
+ if (i != baseIndex) {
+ System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
+ }
+ populateCachedDistinctKeys(vrg, rowIndex, i);
+ HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
+ hiveKey.setHashCode(hashCode);
+ collect(hiveKey, value);
+ }
+ }
+
+ /**
+ * Populate distribution keys part of cachedKeys for a particular row from the batch.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param index the cachedKeys index to write to
+ */
+ private void populatedCachedDistributionKeys(
+ VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+ for (int i = 0; i < numDistributionKeys; i++) {
+ int batchColumn = keyEval[i].getOutputColumn();
+ ColumnVector vectorColumn = vrg.cols[batchColumn];
+ cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
+ }
+ if (cachedKeys[index].length > numDistributionKeys) {
+ cachedKeys[index][numDistributionKeys] = null;
+ }
+ }
+
+ /**
+ * Populate distinct keys part of cachedKeys for a particular row from the batch.
+ * @param vrg the batch
+ * @param rowIndex the row index in the batch
+ * @param index the cachedKeys index to write to
+ */
+ private void populateCachedDistinctKeys(
+ VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
+ StandardUnion union;
+ cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
+ (byte)index, new Object[distinctColIndices.get(index).size()]);
+ Object[] distinctParameters = (Object[]) union.getObject();
+ for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
+ int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
+ int batchColumn = keyEval[distinctColIndex].getOutputColumn();
+ distinctParameters[distinctParamI] =
+ keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
+ }
+ union.setTag((byte) index);
+ }
+
private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
throws HiveException, SerDeException {
for (int i = 0; i < valueEval.length; i++) {
@@ -308,10 +375,8 @@ public class VectorReduceSinkOperator ex
// Evaluate the HashCode
int keyHashCode = 0;
if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide
- // better
- // load balance. If the requirement is to have a single reducer, we
- // should set
+ // If no partition cols, just distribute the data uniformly to provide better
+ // load balance. If the requirement is to have a single reducer, we should set
// the number of reducers to 1.
// Use a constant seed to make the code deterministic.
if (random == null) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Nov 12 18:23:05 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.exec.Ex
import org.apache.hadoop.hive.ql.exec.FunctionInfo;
import org.apache.hadoop.hive.ql.exec.FunctionRegistry;
import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampUtils;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
@@ -76,6 +77,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
import org.apache.hadoop.hive.ql.udf.UDFConv;
import org.apache.hadoop.hive.ql.udf.UDFHex;
import org.apache.hadoop.hive.ql.udf.UDFOPNegative;
@@ -556,7 +558,9 @@ public class VectorizationContext {
List<ExprNodeDesc> childExpr, Mode mode) throws HiveException {
//First handle special cases
if (udf instanceof GenericUDFBetween) {
- return getBetweenFilterExpression(childExpr);
+ return getBetweenFilterExpression(childExpr, mode);
+ } else if (udf instanceof GenericUDFIn) {
+ return getInFilterExpression(childExpr);
} else if (udf instanceof GenericUDFBridge) {
VectorExpression v = getGenericUDFBridgeVectorExpression((GenericUDFBridge) udf, childExpr, mode);
if (v != null) {
@@ -579,6 +583,87 @@ public class VectorizationContext {
}
/**
+ * Create a filter expression for column IN ( <list-of-constants> )
+ * @param childExpr
+ * @return
+ */
+ private VectorExpression getInFilterExpression(List<ExprNodeDesc> childExpr)
+ throws HiveException {
+ ExprNodeDesc colExpr = childExpr.get(0);
+ String colType = colExpr.getTypeString();
+
+ // prepare arguments for createVectorExpression
+ List<ExprNodeDesc> childrenForInList =
+ foldConstantsForUnaryExprs(childExpr.subList(1, childExpr.size()));
+
+ /* This method assumes that the IN list has no NULL entries. That is enforced elsewhere,
+ * in the Vectorizer class. If NULL is passed in as a list entry, behavior is not defined.
+ * If in the future, NULL values are allowed in the IN list, be sure to handle 3-valued
+ * logic correctly. E.g. NOT (col IN (null)) should be considered UNKNOWN, so that would
+ * become FALSE in the WHERE clause, and cause the row in question to be filtered out.
+ * See the discussion in Jira HIVE-5583.
+ */
+
+ VectorExpression expr = null;
+
+ // determine class
+ Class<?> cl = null;
+ if (isIntFamily(colType)) {
+ cl = FilterLongColumnInList.class;
+ long[] inVals = new long[childrenForInList.size()];
+ for (int i = 0; i != inVals.length; i++) {
+ inVals[i] = getIntFamilyScalarAsLong((ExprNodeConstantDesc) childrenForInList.get(i));
+ }
+ FilterLongColumnInList f = (FilterLongColumnInList)
+ createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+ f.setInListValues(inVals);
+ expr = f;
+ } else if (colType.equals("timestamp")) {
+ cl = FilterLongColumnInList.class;
+ long[] inVals = new long[childrenForInList.size()];
+ for (int i = 0; i != inVals.length; i++) {
+ inVals[i] = getTimestampScalar(childrenForInList.get(i));
+ }
+ FilterLongColumnInList f = (FilterLongColumnInList)
+ createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+ f.setInListValues(inVals);
+ expr = f;
+ } else if (colType.equals("string")) {
+ cl = FilterStringColumnInList.class;
+ byte[][] inVals = new byte[childrenForInList.size()][];
+ for (int i = 0; i != inVals.length; i++) {
+ inVals[i] = getStringScalarAsByteArray((ExprNodeConstantDesc) childrenForInList.get(i));
+ }
+ FilterStringColumnInList f =(FilterStringColumnInList)
+ createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+ f.setInListValues(inVals);
+ expr = f;
+ } else if (isFloatFamily(colType)) {
+ cl = FilterDoubleColumnInList.class;
+ double[] inValsD = new double[childrenForInList.size()];
+ for (int i = 0; i != inValsD.length; i++) {
+ inValsD[i] = getNumericScalarAsDouble(childrenForInList.get(i));
+ }
+ FilterDoubleColumnInList f = (FilterDoubleColumnInList)
+ createVectorExpression(cl, childExpr.subList(0, 1), Mode.PROJECTION);
+ f.setInListValues(inValsD);
+ expr = f;
+ } else {
+ throw new HiveException("Type " + colType + " not supported for IN in vectorized mode");
+ }
+ return expr;
+ }
+
+ private byte[] getStringScalarAsByteArray(ExprNodeConstantDesc exprNodeConstantDesc)
+ throws HiveException {
+ Object o = getScalarValue(exprNodeConstantDesc);
+ if (!(o instanceof byte[])) {
+ throw new HiveException("Expected constant argument of type string");
+ }
+ return (byte[]) o;
+ }
+
+ /**
* Invoke special handling for expressions that can't be vectorized by regular
* descriptor based lookup.
*/
@@ -673,9 +758,16 @@ public class VectorizationContext {
* needs to be done differently than the standard way where all arguments are
* passed to the VectorExpression constructor.
*/
- private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr)
+ private VectorExpression getBetweenFilterExpression(List<ExprNodeDesc> childExpr, Mode mode)
throws HiveException {
+ if (mode == Mode.PROJECTION) {
+
+ // Projection mode is not yet supported for [NOT] BETWEEN. Return null so Vectorizer
+ // knows to revert to row-at-a-time execution.
+ return null;
+ }
+
boolean notKeywordPresent = (Boolean) ((ExprNodeConstantDesc) childExpr.get(0)).getValue();
ExprNodeDesc colExpr = childExpr.get(1);
@@ -850,8 +942,38 @@ public class VectorizationContext {
}
}
- // Get a timestamp as a long in number of nanos, from a string constant.
+ private long getIntFamilyScalarAsLong(ExprNodeConstantDesc constDesc)
+ throws HiveException {
+ Object o = getScalarValue(constDesc);
+ if (o instanceof Integer) {
+ return (Integer) o;
+ } else if (o instanceof Long) {
+ return (Long) o;
+ }
+ throw new HiveException("Unexpected type when converting to long");
+ }
+
+ private double getNumericScalarAsDouble(ExprNodeDesc constDesc)
+ throws HiveException {
+ Object o = getScalarValue((ExprNodeConstantDesc) constDesc);
+ if (o instanceof Double) {
+ return (Double) o;
+ } else if (o instanceof Float) {
+ return (Float) o;
+ } else if (o instanceof Integer) {
+ return (Integer) o;
+ } else if (o instanceof Long) {
+ return (Long) o;
+ }
+ throw new HiveException("Unexpected type when converting to double");
+ }
+
+ // Get a timestamp as a long in number of nanos, from a string constant or cast
private long getTimestampScalar(ExprNodeDesc expr) throws HiveException {
+ if (expr instanceof ExprNodeGenericFuncDesc &&
+ ((ExprNodeGenericFuncDesc) expr).getGenericUDF() instanceof GenericUDFTimestamp) {
+ return evaluateCastToTimestamp(expr);
+ }
if (!(expr instanceof ExprNodeConstantDesc)) {
throw new HiveException("Constant timestamp value expected for expression argument. " +
"Non-constant argument not supported for vectorization.");
@@ -868,25 +990,26 @@ public class VectorizationContext {
expr2.setChildren(children);
// initialize and evaluate
- ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr2);
- ObjectInspector output = evaluator.initialize(null);
- Object constant = evaluator.evaluate(null);
- Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output);
-
- if (!(java instanceof Timestamp)) {
- throw new HiveException("Udf: failed to convert from string to timestamp");
- }
- Timestamp ts = (Timestamp) java;
- long result = ts.getTime();
- result *= 1000000; // shift left 6 digits to make room for nanos below ms precision
- result += ts.getNanos() % 1000000; // add in nanos, after removing the ms portion
- return result;
+ return evaluateCastToTimestamp(expr2);
}
throw new HiveException("Udf: unhandled constant type for scalar argument. "
+ "Expecting string.");
}
+ private long evaluateCastToTimestamp(ExprNodeDesc expr) throws HiveException {
+ ExprNodeGenericFuncDesc expr2 = (ExprNodeGenericFuncDesc) expr;
+ ExprNodeEvaluator evaluator = ExprNodeEvaluatorFactory.get(expr2);
+ ObjectInspector output = evaluator.initialize(null);
+ Object constant = evaluator.evaluate(null);
+ Object java = ObjectInspectorUtils.copyToStandardJavaObject(constant, output);
+
+ if (!(java instanceof Timestamp)) {
+ throw new HiveException("Udf: failed to convert to timestamp");
+ }
+ Timestamp ts = (Timestamp) java;
+ return TimestampUtils.getTimeNanoSec(ts);
+ }
private Constructor<?> getConstructor(Class<?> cl) throws HiveException {
try {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Tue Nov 12 18:23:05 2013
@@ -341,30 +341,6 @@ public class HiveInputFormat<K extends W
return result.toArray(new HiveInputSplit[result.size()]);
}
- public void validateInput(JobConf job) throws IOException {
-
- init(job);
-
- Path[] dirs = FileInputFormat.getInputPaths(job);
- if (dirs.length == 0) {
- throw new IOException("No input paths specified in job");
- }
- JobConf newjob = new JobConf(job);
-
- // for each dir, get the InputFormat, and do validateInput.
- for (Path dir : dirs) {
- PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
- // create a new InputFormat instance if this is the first time to see this
- // class
- InputFormat inputFormat = getInputFormatFromCache(part
- .getInputFileFormatClass(), job);
-
- FileInputFormat.setInputPaths(newjob, dir);
- newjob.setInputFormat(inputFormat.getClass());
- ShimLoader.getHadoopShims().inputFormatValidateInput(inputFormat, newjob);
- }
- }
-
protected static PartitionDesc getPartitionDescFromPath(
Map<String, PartitionDesc> pathToPartitionInfo, Path dir)
throws IOException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Tue Nov 12 18:23:05 2013
@@ -29,7 +29,10 @@ public class HiveKey extends BytesWritab
private static final int LENGTH_BYTES = 4;
- boolean hashCodeValid;
+ private int hashCode;
+ private boolean hashCodeValid;
+
+ private transient int distKeyLength;
public HiveKey() {
hashCodeValid = false;
@@ -37,15 +40,13 @@ public class HiveKey extends BytesWritab
public HiveKey(byte[] bytes, int hashcode) {
super(bytes);
- myHashCode = hashcode;
+ hashCode = hashcode;
hashCodeValid = true;
}
- protected int myHashCode;
-
public void setHashCode(int myHashCode) {
hashCodeValid = true;
- this.myHashCode = myHashCode;
+ hashCode = myHashCode;
}
@Override
@@ -54,7 +55,15 @@ public class HiveKey extends BytesWritab
throw new RuntimeException("Cannot get hashCode() from deserialized "
+ HiveKey.class);
}
- return myHashCode;
+ return hashCode;
+ }
+
+ public void setDistKeyLength(int distKeyLength) {
+ this.distKeyLength = distKeyLength;
+ }
+
+ public int getDistKeyLength() {
+ return distKeyLength;
}
/** A Comparator optimized for HiveKey. */
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/FileDump.java Tue Nov 12 18:23:05 2013
@@ -41,8 +41,17 @@ public final class FileDump {
System.out.println("Compression size: " + reader.getCompressionSize());
}
System.out.println("Type: " + reader.getObjectInspector().getTypeName());
+ System.out.println("\nStripe Statistics:");
+ Metadata metadata = reader.getMetadata();
+ for (int n = 0; n < metadata.getStripeStatistics().size(); n++) {
+ System.out.println(" Stripe " + (n + 1) + ":");
+ StripeStatistics ss = metadata.getStripeStatistics().get(n);
+ for (int i = 0; i < ss.getColumnStatistics().length; ++i) {
+ System.out.println(" Column " + i + ": " + ss.getColumnStatistics()[i].toString());
+ }
+ }
ColumnStatistics[] stats = reader.getStatistics();
- System.out.println("\nStatistics:");
+ System.out.println("\nFile Statistics:");
for(int i=0; i < stats.length; ++i) {
System.out.println(" Column " + i + ": " + stats[i].toString());
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Tue Nov 12 18:23:05 2013
@@ -104,7 +104,8 @@ public final class OrcFile {
public static final String ENABLE_INDEXES = "orc.create.index";
public static final String BLOCK_PADDING = "orc.block.padding";
- static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+ static final long DEFAULT_STRIPE_SIZE =
+ HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.defaultLongVal;
static final CompressionKind DEFAULT_COMPRESSION_KIND =
CompressionKind.ZLIB;
static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
@@ -138,7 +139,7 @@ public final class OrcFile {
private final Configuration configuration;
private FileSystem fileSystemValue = null;
private ObjectInspector inspectorValue = null;
- private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+ private long stripeSizeValue;
private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
@@ -149,6 +150,9 @@ public final class OrcFile {
WriterOptions(Configuration conf) {
configuration = conf;
memoryManagerValue = getMemoryManager(conf);
+ stripeSizeValue =
+ conf.getLong(HiveConf.ConfVars.HIVE_ORC_DEFAULT_STRIPE_SIZE.varname,
+ DEFAULT_STRIPE_SIZE);
String versionName =
conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
if (versionName == null) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java Tue Nov 12 18:23:05 2013
@@ -47,10 +47,12 @@ public class OrcSplit extends FileSplit
// serialize FileMetaInfo fields
Text.writeString(out, fileMetaInfo.compressionType);
WritableUtils.writeVInt(out, fileMetaInfo.bufferSize);
+ WritableUtils.writeVInt(out, fileMetaInfo.metadataSize);
// serialize FileMetaInfo field footer
ByteBuffer footerBuff = fileMetaInfo.footerBuffer;
footerBuff.reset();
+
// write length of buffer
WritableUtils.writeVInt(out, footerBuff.limit() - footerBuff.position());
out.write(footerBuff.array(), footerBuff.position(),
@@ -69,13 +71,14 @@ public class OrcSplit extends FileSplit
// deserialize FileMetaInfo fields
String compressionType = Text.readString(in);
int bufferSize = WritableUtils.readVInt(in);
+ int metadataSize = WritableUtils.readVInt(in);
// deserialize FileMetaInfo field footer
int footerBuffSize = WritableUtils.readVInt(in);
ByteBuffer footerBuff = ByteBuffer.allocate(footerBuffSize);
in.readFully(footerBuff.array(), 0, footerBuffSize);
- fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, footerBuff);
+ fileMetaInfo = new FileMetaInfo(compressionType, bufferSize, metadataSize, footerBuff);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java?rev=1541190&r1=1541189&r2=1541190&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java Tue Nov 12 18:23:05 2013
@@ -108,6 +108,13 @@ public interface Reader {
ColumnStatistics[] getStatistics();
/**
+ * Get the metadata information like stripe level column statistics etc.
+ * @return the information about the column
+ * @throws IOException
+ */
+ Metadata getMetadata() throws IOException;
+
+ /**
* Get the list of types contained in the file. The root type is the first
* type in the list.
* @return the list of flattened types
@@ -122,10 +129,12 @@ public interface Reader {
class FileMetaInfo{
final String compressionType;
final int bufferSize;
+ final int metadataSize;
final ByteBuffer footerBuffer;
- FileMetaInfo(String compressionType, int bufferSize, ByteBuffer footerBuffer){
+ FileMetaInfo(String compressionType, int bufferSize, int metadataSize, ByteBuffer footerBuffer){
this.compressionType = compressionType;
this.bufferSize = bufferSize;
+ this.metadataSize = metadataSize;
this.footerBuffer = footerBuffer;
}
}