You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC
svn commit: r1629544 [8/33] - in /hive/branches/spark-new: ./
accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/
bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/
common/src/test/org/apache/hadoop/hive/common/type/ co...
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorReduceSinkOperator.java Sun Oct 5 22:26:43 2014
@@ -18,99 +18,30 @@
package org.apache.hadoop.hive.ql.exec.vector;
-import java.io.IOException;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.PTFTopNHash;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TopNHash;
-import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
-import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
-import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.serde2.SerDeException;
-import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
-import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Text;
-// import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
public class VectorReduceSinkOperator extends ReduceSinkOperator {
- private static final Log LOG = LogFactory.getLog(
- VectorReduceSinkOperator.class.getName());
-
private static final long serialVersionUID = 1L;
- /**
- * The evaluators for the key columns. Key columns decide the sort order on
- * the reducer side. Key columns are passed to the reducer in the "key".
- */
- private VectorExpression[] keyEval;
-
- /**
- * The key value writers. These know how to write the necessary writable type
- * based on key column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] keyWriters;
-
- /**
- * The evaluators for the value columns. Value columns are passed to reducer
- * in the "value".
- */
- private VectorExpression[] valueEval;
-
- /**
- * The output value writers. These know how to write the necessary writable type
- * based on value column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] valueWriters;
-
- /**
- * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
- * Hive language). Partition columns decide the reducer that the current row
- * goes to. Partition columns are not passed to reducer.
- */
- private VectorExpression[] partitionEval;
-
- /**
- * Evaluators for bucketing columns. This is used to compute bucket number.
- */
- private VectorExpression[] bucketEval;
- private int buckColIdxInKey;
-
- /**
- * The partition value writers. These know how to write the necessary writable type
- * based on partition column metadata, from the primitive vector type.
- */
- private transient VectorExpressionWriter[] partitionWriters;
- private transient VectorExpressionWriter[] bucketWriters = null;
-
- private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+ // Writer for producing row from input batch.
+ private VectorExpressionWriter[] rowWriters;
+
+ protected transient Object[] singleRow;
public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
throws HiveException {
this();
ReduceSinkDesc desc = (ReduceSinkDesc) conf;
this.conf = desc;
- keyEval = vContext.getVectorExpressions(desc.getKeyCols());
- valueEval = vContext.getVectorExpressions(desc.getValueCols());
- partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
- bucketEval = null;
- if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
- bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
- buckColIdxInKey = desc.getPartitionCols().size();
- }
}
public VectorReduceSinkOperator() {
@@ -119,399 +50,49 @@ public class VectorReduceSinkOperator ex
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
- try {
- numDistributionKeys = conf.getNumDistributionKeys();
- distinctColIndices = conf.getDistinctColumnIndices();
- numDistinctExprs = distinctColIndices.size();
-
- TableDesc keyTableDesc = conf.getKeySerializeInfo();
- keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
- .newInstance();
- keySerializer.initialize(null, keyTableDesc.getProperties());
- keyIsText = keySerializer.getSerializedClass().equals(Text.class);
-
- /*
- * Compute and assign the key writers and the key object inspector
- */
- VectorExpressionWriterFactory.processVectorExpressions(
- conf.getKeyCols(),
- conf.getOutputKeyColumnNames(),
- new VectorExpressionWriterFactory.SingleOIDClosure() {
- @Override
- public void assign(VectorExpressionWriter[] writers,
- ObjectInspector objectInspector) {
- keyWriters = writers;
- keyObjectInspector = objectInspector;
- }
- });
-
- String colNames = "";
- for(String colName : conf.getOutputKeyColumnNames()) {
- colNames = String.format("%s %s", colNames, colName);
- }
-
- if (isDebugEnabled) {
- LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
- keyObjectInspector.getClass(),
- keyObjectInspector,
- colNames));
- }
-
- partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
- if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
- bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
- }
-
- TableDesc valueTableDesc = conf.getValueSerializeInfo();
- valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
- .newInstance();
- valueSerializer.initialize(null, valueTableDesc.getProperties());
-
- /*
- * Compute and assign the value writers and the value object inspector
- */
- VectorExpressionWriterFactory.processVectorExpressions(
- conf.getValueCols(),
- conf.getOutputValueColumnNames(),
- new VectorExpressionWriterFactory.SingleOIDClosure() {
- @Override
- public void assign(VectorExpressionWriter[] writers,
- ObjectInspector objectInspector) {
- valueWriters = writers;
- valueObjectInspector = objectInspector;
+ // We need a input object inspector that is for the row we will extract out of the
+ // vectorized row batch, not for example, an original inspector for an ORC table, etc.
+ VectorExpressionWriterFactory.processVectorInspector(
+ (StructObjectInspector) inputObjInspectors[0],
+ new VectorExpressionWriterFactory.SingleOIDClosure() {
+ @Override
+ public void assign(VectorExpressionWriter[] writers,
+ ObjectInspector objectInspector) {
+ rowWriters = writers;
+ inputObjInspectors[0] = objectInspector;
}
- });
-
- if (isDebugEnabled) {
- colNames = "";
- for(String colName : conf.getOutputValueColumnNames()) {
- colNames = String.format("%s %s", colNames, colName);
- }
- }
-
- if (isDebugEnabled) {
- LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
- valueObjectInspector.getClass(),
- valueObjectInspector,
- colNames));
- }
+ });
+ singleRow = new Object[rowWriters.length];
- int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
- int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
- numDistributionKeys;
- cachedKeys = new Object[numKeys][keyLen];
- cachedValues = new Object[valueEval.length];
-
- int tag = conf.getTag();
- tagByte[0] = (byte) tag;
- LOG.info("Using tag = " + tag);
-
- int limit = conf.getTopN();
- float memUsage = conf.getTopNMemoryUsage();
- if (limit >= 0 && memUsage > 0) {
- reducerHash = conf.isPTFReduceSink() ? new PTFTopNHash() : reducerHash;
- reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
- }
-
- autoParallel = conf.isAutoParallel();
-
- } catch(Exception e) {
- throw new HiveException(e);
- }
+ // Call ReduceSinkOperator with new input inspector.
+ super.initializeOp(hconf);
}
@Override
- public void processOp(Object row, int tag) throws HiveException {
- VectorizedRowBatch vrg = (VectorizedRowBatch) row;
-
- if (isDebugEnabled) {
- LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
- vrg.size,
- valueEval.length,
- keyEval.length,
- partitionEval.length));
- }
-
- try {
- // Evaluate the keys
- for (int i = 0; i < keyEval.length; i++) {
- keyEval[i].evaluate(vrg);
- }
-
- // Determine which rows we need to emit based on topN optimization
- int startResult = reducerHash.startVectorizedBatch(vrg.size);
- if (startResult == TopNHash.EXCLUDE) {
- return; // TopN wants us to exclude all rows.
- }
- // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
- for (int i = 0; i < partitionEval.length; i++) {
- partitionEval[i].evaluate(vrg);
- }
- if (bucketEval != null) {
- for (int i = 0; i < bucketEval.length; i++) {
- bucketEval[i].evaluate(vrg);
- }
- }
- // run the vector evaluations
- for (int i = 0; i < valueEval.length; i++) {
- valueEval[i].evaluate(vrg);
- }
-
- boolean useTopN = startResult != TopNHash.FORWARD;
- // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
- // If we are using topN, we will make the first key for each row and store/forward it.
- // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
- for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
- int rowIndex = batchIndex;
- if (vrg.selectedInUse) {
- rowIndex = vrg.selected[batchIndex];
- }
- // First, make distrib key components for this row and determine distKeyLength.
- populatedCachedDistributionKeys(vrg, rowIndex, 0);
-
- // replace bucketing columns with hashcode % numBuckets
- int buckNum = -1;
- if (bucketEval != null) {
- buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
- cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
- }
- HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
- int distKeyLength = firstKey.getDistKeyLength();
- // Add first distinct expression, if any.
- if (numDistinctExprs > 0) {
- populateCachedDistinctKeys(vrg, rowIndex, 0);
- firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
- }
-
- final int hashCode;
-
- // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
- if (autoParallel && partitionEval.length > 0) {
- hashCode = computeMurmurHash(firstKey);
- } else {
- hashCode = computeHashCode(vrg, rowIndex, buckNum);
- }
-
- firstKey.setHashCode(hashCode);
-
- if (useTopN) {
- /*
- * in case of TopN for windowing, we need to distinguish between
- * rows with null partition keys and rows with value 0 for partition keys.
- */
- boolean partkeysNull = conf.isPTFReduceSink() && partitionKeysAreNull(vrg, rowIndex);
- reducerHash.tryStoreVectorizedKey(firstKey, partkeysNull, batchIndex);
- } else {
- // No TopN, just forward the first key and all others.
- BytesWritable value = makeValueWritable(vrg, rowIndex);
- collect(firstKey, value);
- forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
- }
- }
-
- if (!useTopN) return; // All done.
-
- // If we use topN, we have called tryStore on every key now. We can process the results.
- for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
- int result = reducerHash.getVectorizedBatchResult(batchIndex);
- if (result == TopNHash.EXCLUDE) continue;
- int rowIndex = batchIndex;
- if (vrg.selectedInUse) {
- rowIndex = vrg.selected[batchIndex];
- }
- // Compute value and hashcode - we'd either store or forward them.
- BytesWritable value = makeValueWritable(vrg, rowIndex);
- int distKeyLength = -1;
- int hashCode;
- if (result == TopNHash.FORWARD) {
- HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
- distKeyLength = firstKey.getDistKeyLength();
- hashCode = firstKey.hashCode();
- collect(firstKey, value);
- } else {
- hashCode = reducerHash.getVectorizedKeyHashCode(batchIndex);
- reducerHash.storeValue(result, hashCode, value, true);
- distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
- }
- // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
- // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
- if (numDistinctExprs > 1) {
- populatedCachedDistributionKeys(vrg, rowIndex, 1);
- forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
- }
- }
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (IOException e) {
- throw new HiveException(e);
- }
- }
-
- /**
- * This function creates and forwards all the additional KVs for the multi-distinct case,
- * after the first (0th) KV pertaining to the row has already been stored or forwarded.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param hashCode the partitioning hash code to use; same as for the first KV
- * @param value the value to use; same as for the first KV
- * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
- * @param tag the tag
- * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
- */
- private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
- BytesWritable value, int distKeyLength, int tag, int baseIndex)
- throws HiveException, SerDeException, IOException {
- // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
- // the first key has already been stored. There's few bytes difference between keys
- // for different distincts, and the value/etc. are all the same.
- // We could store deltas to re-gen extra rows when flushing TopN.
- for (int i = 1; i < numDistinctExprs; i++) {
- if (i != baseIndex) {
- System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
- }
- populateCachedDistinctKeys(vrg, rowIndex, i);
- HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
- hiveKey.setHashCode(hashCode);
- collect(hiveKey, value);
- }
- }
-
- /**
- * Populate distribution keys part of cachedKeys for a particular row from the batch.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param index the cachedKeys index to write to
- */
- private void populatedCachedDistributionKeys(
- VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
- for (int i = 0; i < numDistributionKeys; i++) {
- int batchColumn = keyEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
- }
- if (cachedKeys[index].length > numDistributionKeys) {
- cachedKeys[index][numDistributionKeys] = null;
- }
- }
-
- /**
- * Populate distinct keys part of cachedKeys for a particular row from the batch.
- * @param vrg the batch
- * @param rowIndex the row index in the batch
- * @param index the cachedKeys index to write to
- */
- private void populateCachedDistinctKeys(
- VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
- StandardUnion union;
- cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
- (byte)index, new Object[distinctColIndices.get(index).size()]);
- Object[] distinctParameters = (Object[]) union.getObject();
- for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
- int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
- int batchColumn = keyEval[distinctColIndex].getOutputColumn();
- distinctParameters[distinctParamI] =
- keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
- }
- union.setTag((byte) index);
- }
+ public void processOp(Object data, int tag) throws HiveException {
+ VectorizedRowBatch vrg = (VectorizedRowBatch) data;
- private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
- throws HiveException, SerDeException {
- for (int i = 0; i < valueEval.length; i++) {
- int batchColumn = valueEval[i].getOutputColumn();
- ColumnVector vectorColumn = vrg.cols[batchColumn];
- cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
+ for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
+ Object row = getRowObject(vrg, batchIndex);
+ super.processOp(row, tag);
}
- // Serialize the value
- return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
}
- private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
- // Evaluate the HashCode
- int keyHashCode = 0;
- if (partitionEval.length == 0) {
- // If no partition cols, just distribute the data uniformly to provide better
- // load balance. If the requirement is to have a single reducer, we should set
- // the number of reducers to 1.
- // Use a constant seed to make the code deterministic.
- if (random == null) {
- random = new Random(12345);
- }
- keyHashCode = random.nextInt();
- } else {
- for (int p = 0; p < partitionEval.length; p++) {
- ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
- Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
- keyHashCode = keyHashCode
- * 31
- + ObjectInspectorUtils.hashCode(
- partitionValue,
- partitionWriters[p].getObjectInspector());
- }
- }
- return buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
- }
-
- private boolean partitionKeysAreNull(VectorizedRowBatch vrg, int rowIndex)
+ private Object[] getRowObject(VectorizedRowBatch vrg, int rowIndex)
throws HiveException {
- if (partitionEval.length != 0) {
- for (int p = 0; p < partitionEval.length; p++) {
- ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
- Object partitionValue = partitionWriters[p].writeValue(columnVector,
- rowIndex);
- if (partitionValue != null) {
- return false;
- }
+ int batchIndex = rowIndex;
+ if (vrg.selectedInUse) {
+ batchIndex = vrg.selected[rowIndex];
+ }
+ for (int i = 0; i < vrg.projectionSize; i++) {
+ ColumnVector vectorColumn = vrg.cols[vrg.projectedColumns[i]];
+ if (vectorColumn != null) {
+ singleRow[i] = rowWriters[i].writeValue(vectorColumn, batchIndex);
+ } else {
+ // Some columns from tables are not used.
+ singleRow[i] = null;
}
- return true;
- }
- return false;
- }
-
- private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
- int bucketNum = 0;
- for (int p = 0; p < bucketEval.length; p++) {
- ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
- Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
- bucketNum = bucketNum
- * 31
- + ObjectInspectorUtils.hashCode(
- bucketValue,
- bucketWriters[p].getObjectInspector());
}
-
- if (bucketNum < 0) {
- bucketNum = -1 * bucketNum;
- }
-
- return bucketNum % numBuckets;
- }
-
- static public String getOperatorName() {
- return "RS";
- }
-
- public VectorExpression[] getPartitionEval() {
- return partitionEval;
- }
-
- public void setPartitionEval(VectorExpression[] partitionEval) {
- this.partitionEval = partitionEval;
- }
-
- public VectorExpression[] getValueEval() {
- return valueEval;
- }
-
- public void setValueEval(VectorExpression[] valueEval) {
- this.valueEval = valueEval;
- }
-
- public VectorExpression[] getKeyEval() {
- return keyEval;
- }
-
- public void setKeyEval(VectorExpression[] keyEval) {
- this.keyEval = keyEval;
+ return singleRow;
}
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Sun Oct 5 22:26:43 2014
@@ -1889,47 +1889,47 @@ public class VectorizationContext {
// TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively
// marked map-side (HASH).
static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMinLong.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, null, VectorUDAFMaxLong.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_DATETIME_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFCountMerge.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
+ add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class));
+ add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_TIMESTAMP_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class));
}};
public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Sun Oct 5 22:26:43 2014
@@ -140,6 +140,20 @@ public class VectorizedRowBatchCtx {
/**
+ * Initializes the VectorizedRowBatch context based on an scratch column type map and
+ * object inspector.
+ * @param columnTypeMap
+ * @param rowOI
+ * Object inspector that shapes the column types
+ */
+ public void init(Map<Integer, String> columnTypeMap,
+ StructObjectInspector rowOI) {
+ this.columnTypeMap = columnTypeMap;
+ this.rowOI= rowOI;
+ this.rawRowOI = rowOI;
+ }
+
+ /**
* Initializes VectorizedRowBatch context based on the
* split and Hive configuration (Job conf with hive Plan).
*
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/expressions/VectorExpressionWriterFactory.java Sun Oct 5 22:26:43 2014
@@ -1060,6 +1060,31 @@ public final class VectorExpressionWrite
closure.assign(writers, oids);
}
+ /**
+ * Creates the value writers for an struct object inspector.
+ * Creates an appropriate output object inspector.
+ */
+ public static void processVectorInspector(
+ StructObjectInspector structObjInspector,
+ SingleOIDClosure closure)
+ throws HiveException {
+ List<? extends StructField> fields = structObjInspector.getAllStructFieldRefs();
+ VectorExpressionWriter[] writers = new VectorExpressionWriter[fields.size()];
+ List<ObjectInspector> oids = new ArrayList<ObjectInspector>(writers.length);
+ ArrayList<String> columnNames = new ArrayList<String>();
+ int i = 0;
+ for(StructField field : fields) {
+ ObjectInspector fieldObjInsp = field.getFieldObjectInspector();
+ writers[i] = VectorExpressionWriterFactory.
+ genVectorExpressionWritable(fieldObjInsp);
+ columnNames.add(field.getFieldName());
+ oids.add(writers[i].getObjectInspector());
+ i++;
+ }
+ ObjectInspector objectInspector = ObjectInspectorFactory.
+ getStandardStructObjectInspector(columnNames,oids);
+ closure.assign(writers, objectInspector);
+ }
/**
* Returns {@link VectorExpressionWriter} objects for the fields in the given
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/hooks/HookContext.java Sun Oct 5 22:26:43 2014
@@ -147,7 +147,7 @@ public class HookContext {
}
public String getOperationName() {
- return SessionState.get().getHiveOperation().name();
+ return queryPlan.getOperationName();
}
public String getUserName() {
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java Sun Oct 5 22:26:43 2014
@@ -40,6 +40,8 @@ import java.util.regex.Pattern;
* are used by the compactor and cleaner and thus must be format agnostic.
*/
public class AcidUtils {
+ // This key will be put in the conf file when planning an acid operation
+ public static final String CONF_ACID_KEY = "hive.doing.acid";
public static final String BASE_PREFIX = "base_";
public static final String DELTA_PREFIX = "delta_";
public static final PathFilter deltaFileFilter = new PathFilter() {
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java Sun Oct 5 22:26:43 2014
@@ -161,10 +161,11 @@ public abstract class HiveContextAwareRe
}
public IOContext getIOContext() {
- return IOContext.get();
+ return IOContext.get(jobConf.get(Utilities.INPUT_NAME));
}
- public void initIOContext(long startPos, boolean isBlockPointer, Path inputPath) {
+ private void initIOContext(long startPos, boolean isBlockPointer,
+ Path inputPath) {
ioCxtRef = this.getIOContext();
ioCxtRef.currentBlockStart = startPos;
ioCxtRef.isBlockPointer = isBlockPointer;
@@ -183,7 +184,7 @@ public abstract class HiveContextAwareRe
boolean blockPointer = false;
long blockStart = -1;
- FileSplit fileSplit = (FileSplit) split;
+ FileSplit fileSplit = split;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(job);
if (inputFormatClass.getName().contains("SequenceFile")) {
@@ -202,12 +203,15 @@ public abstract class HiveContextAwareRe
blockStart = in.getPosition();
in.close();
}
+ this.jobConf = job;
this.initIOContext(blockStart, blockPointer, path.makeQualified(fs));
this.initIOContextSortedProps(split, recordReader, job);
}
public void initIOContextSortedProps(FileSplit split, RecordReader recordReader, JobConf job) {
+ this.jobConf = job;
+
this.getIOContext().resetSortingValues();
this.isSorted = jobConf.getBoolean("hive.input.format.sorted", false);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Sun Oct 5 22:26:43 2014
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.log.PerfLogger;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -253,7 +254,14 @@ public class HiveInputFormat<K extends W
}
protected void init(JobConf job) {
- mrwork = Utilities.getMapWork(job);
+ if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+ mrwork = (MapWork) Utilities.getMergeWork(job);
+ if (mrwork == null) {
+ mrwork = Utilities.getMapWork(job);
+ }
+ } else {
+ mrwork = Utilities.getMapWork(job);
+ }
pathToPartitionInfo = mrwork.getPathToPartitionInfo();
}
@@ -420,6 +428,9 @@ public class HiveInputFormat<K extends W
public static void pushFilters(JobConf jobConf, TableScanOperator tableScan) {
+ // ensure filters are not set from previous pushFilters
+ jobConf.unset(TableScanDesc.FILTER_TEXT_CONF_STR);
+ jobConf.unset(TableScanDesc.FILTER_EXPR_CONF_STR);
TableScanDesc scanDesc = tableScan.getConf();
if (scanDesc == null) {
return;
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/IOContext.java Sun Oct 5 22:26:43 2014
@@ -18,7 +18,14 @@
package org.apache.hadoop.hive.ql.io;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.optimizer.ConvertJoinMapJoin;
+import org.apache.hadoop.hive.ql.session.SessionState;
/**
@@ -31,14 +38,25 @@ import org.apache.hadoop.fs.Path;
*/
public class IOContext {
-
private static ThreadLocal<IOContext> threadLocal = new ThreadLocal<IOContext>(){
@Override
protected synchronized IOContext initialValue() { return new IOContext(); }
};
- public static IOContext get() {
- return IOContext.threadLocal.get();
+ private static Map<String, IOContext> inputNameIOContextMap = new HashMap<String, IOContext>();
+ private static IOContext ioContext = new IOContext();
+
+ public static Map<String, IOContext> getMap() {
+ return inputNameIOContextMap;
+ }
+
+ public static IOContext get(String inputName) {
+ if (inputNameIOContextMap.containsKey(inputName) == false) {
+ IOContext ioContext = new IOContext();
+ inputNameIOContextMap.put(inputName, ioContext);
+ }
+
+ return inputNameIOContextMap.get(inputName);
}
public static void clear() {
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Sun Oct 5 22:26:43 2014
@@ -132,7 +132,7 @@ public class OrcInputFormat implements
@Override
public boolean shouldSkipCombine(Path path,
Configuration conf) throws IOException {
- return AcidUtils.isAcid(path, conf);
+ return (conf.get(AcidUtils.CONF_ACID_KEY) != null) || AcidUtils.isAcid(path, conf);
}
private static class OrcRecordReader
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewInputFormat.java Sun Oct 5 22:26:43 2014
@@ -118,13 +118,11 @@ public class OrcNewInputFormat extends I
public List<InputSplit> getSplits(JobContext jobContext)
throws IOException, InterruptedException {
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
- Configuration conf =
- ShimLoader.getHadoopShims().getConfiguration(jobContext);
List<OrcSplit> splits =
OrcInputFormat.generateSplitsInfo(ShimLoader.getHadoopShims()
.getConfiguration(jobContext));
- List<InputSplit> result = new ArrayList<InputSplit>();
- for(OrcSplit split: OrcInputFormat.generateSplitsInfo(conf)) {
+ List<InputSplit> result = new ArrayList<InputSplit>(splits.size());
+ for(OrcSplit split: splits) {
result.add(new OrcNewSplit(split));
}
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.ORC_GET_SPLITS);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerWriterV2.java Sun Oct 5 22:26:43 2014
@@ -418,138 +418,120 @@ class RunLengthIntegerWriterV2 implement
private void determineEncoding() {
- int idx = 0;
+ // we need to compute zigzag values for DIRECT encoding if we decide to
+ // break early for delta overflows or for shorter runs
+ computeZigZagLiterals();
- // for identifying monotonic sequences
- boolean isIncreasing = false;
- int increasingCount = 1;
- boolean isDecreasing = false;
- int decreasingCount = 1;
+ zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0);
- // for identifying type of delta encoding
- min = literals[0];
- long max = literals[0];
- isFixedDelta = true;
- long currDelta = 0;
+ // not a big win for shorter runs to determine encoding
+ if (numLiterals <= MIN_REPEAT) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
- min = literals[0];
- long deltaMax = 0;
+ // DELTA encoding check
- // populate all variables to identify the encoding type
- if (numLiterals >= 1) {
- currDelta = literals[1] - literals[0];
- for(int i = 0; i < numLiterals; i++) {
- if (i > 0 && literals[i] >= max) {
- max = literals[i];
- increasingCount++;
- }
+ // for identifying monotonic sequences
+ boolean isIncreasing = true;
+ boolean isDecreasing = true;
+ this.isFixedDelta = true;
- if (i > 0 && literals[i] <= min) {
- min = literals[i];
- decreasingCount++;
- }
+ this.min = literals[0];
+ long max = literals[0];
+ final long initialDelta = literals[1] - literals[0];
+ long currDelta = initialDelta;
+ long deltaMax = initialDelta;
+ this.adjDeltas[0] = initialDelta;
+
+ for (int i = 1; i < numLiterals; i++) {
+ final long l1 = literals[i];
+ final long l0 = literals[i - 1];
+ currDelta = l1 - l0;
+ min = Math.min(min, l1);
+ max = Math.max(max, l1);
+
+ isIncreasing &= (l0 <= l1);
+ isDecreasing &= (l0 >= l1);
+
+ isFixedDelta &= (currDelta == initialDelta);
+ if (i > 1) {
+ adjDeltas[i - 1] = Math.abs(currDelta);
+ deltaMax = Math.max(deltaMax, adjDeltas[i - 1]);
+ }
+ }
- // if delta doesn't changes then mark it as fixed delta
- if (i > 0 && isFixedDelta) {
- if (literals[i] - literals[i - 1] != currDelta) {
- isFixedDelta = false;
- }
+ // its faster to exit under delta overflow condition without checking for
+ // PATCHED_BASE condition as encoding using DIRECT is faster and has less
+ // overhead than PATCHED_BASE
+ if (!utils.isSafeSubtract(max, min)) {
+ encoding = EncodingType.DIRECT;
+ return;
+ }
- fixedDelta = currDelta;
- }
+ // invariant - subtracting any number from any other in the literals after
+ // this point won't overflow
- // populate zigzag encoded literals
- long zzEncVal = 0;
- if (signed) {
- zzEncVal = utils.zigzagEncode(literals[i]);
- } else {
- zzEncVal = literals[i];
- }
- zigzagLiterals[idx] = zzEncVal;
- idx++;
+ // if initialDelta is 0 then we cannot delta encode as we cannot identify
+ // the sign of deltas (increasing or decreasing)
+ if (initialDelta != 0) {
+
+ // if min is equal to max then the delta is 0, this condition happens for
+ // fixed values run >10 which cannot be encoded with SHORT_REPEAT
+ if (min == max) {
+ assert isFixedDelta : min + "==" + max +
+ ", isFixedDelta cannot be false";
+ assert currDelta == 0 : min + "==" + max + ", currDelta should be zero";
+ fixedDelta = 0;
+ encoding = EncodingType.DELTA;
+ return;
+ }
- // max delta value is required for computing the fixed bits
- // required for delta blob in delta encoding
- if (i > 0) {
- if (i == 1) {
- // first value preserve the sign
- adjDeltas[i - 1] = literals[i] - literals[i - 1];
- } else {
- adjDeltas[i - 1] = Math.abs(literals[i] - literals[i - 1]);
- if (adjDeltas[i - 1] > deltaMax) {
- deltaMax = adjDeltas[i - 1];
- }
- }
- }
+ if (isFixedDelta) {
+ assert currDelta == initialDelta
+ : "currDelta should be equal to initialDelta for fixed delta encoding";
+ encoding = EncodingType.DELTA;
+ fixedDelta = currDelta;
+ return;
}
// stores the number of bits required for packing delta blob in
// delta encoding
bitsDeltaMax = utils.findClosestNumBits(deltaMax);
- // if decreasing count equals total number of literals then the
- // sequence is monotonically decreasing
- if (increasingCount == 1 && decreasingCount == numLiterals) {
- isDecreasing = true;
- }
-
- // if increasing count equals total number of literals then the
- // sequence is monotonically increasing
- if (decreasingCount == 1 && increasingCount == numLiterals) {
- isIncreasing = true;
+ // monotonic condition
+ if (isIncreasing || isDecreasing) {
+ encoding = EncodingType.DELTA;
+ return;
}
}
- // if the sequence is both increasing and decreasing then it is not
- // monotonic
- if (isDecreasing && isIncreasing) {
- isDecreasing = false;
- isIncreasing = false;
- }
-
- // fixed delta condition
- if (isIncreasing == false && isDecreasing == false && isFixedDelta == true) {
- encoding = EncodingType.DELTA;
- return;
- }
-
- // monotonic condition
- if (isIncreasing || isDecreasing) {
- encoding = EncodingType.DELTA;
- return;
- }
+ // PATCHED_BASE encoding check
// percentile values are computed for the zigzag encoded values. if the
// number of bit requirement between 90th and 100th percentile varies
// beyond a threshold then we need to patch the values. if the variation
- // is not significant then we can use direct or delta encoding
-
- double p = 0.9;
- zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
-
- p = 1.0;
- zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, p);
+ // is not significant then we can use direct encoding
+ zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9);
int diffBitsLH = zzBits100p - zzBits90p;
// if the difference between 90th percentile and 100th percentile fixed
// bits is > 1 then we need patch the values
- if (isIncreasing == false && isDecreasing == false && diffBitsLH > 1
- && isFixedDelta == false) {
+ if (diffBitsLH > 1) {
+
// patching is done only on base reduced values.
// remove base from literals
- for(int i = 0; i < numLiterals; i++) {
+ for (int i = 0; i < numLiterals; i++) {
baseRedLiterals[i] = literals[i] - min;
}
// 95th percentile width is used to determine max allowed value
// after which patching will be done
- p = 0.95;
- brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
+ brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95);
// 100th percentile is used to compute the max patch width
- p = 1.0;
- brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, p);
+ brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0);
// after base reducing the values, if the difference in bits between
// 95th percentile and 100th percentile value is zero then there
@@ -565,19 +547,24 @@ class RunLengthIntegerWriterV2 implement
encoding = EncodingType.DIRECT;
return;
}
- }
-
- // if difference in bits between 95th percentile and 100th percentile is
- // 0, then patch length will become 0. Hence we will fallback to direct
- if (isIncreasing == false && isDecreasing == false && diffBitsLH <= 1
- && isFixedDelta == false) {
+ } else {
+ // if difference in bits between 95th percentile and 100th percentile is
+ // 0, then patch length will become 0. Hence we will fallback to direct
encoding = EncodingType.DIRECT;
return;
}
+ }
- // this should not happen
- if (encoding == null) {
- throw new RuntimeException("Integer encoding cannot be determined.");
+ private void computeZigZagLiterals() {
+ // populate zigzag encoded literals
+ long zzEncVal = 0;
+ for (int i = 0; i < numLiterals; i++) {
+ if (signed) {
+ zzEncVal = utils.zigzagEncode(literals[i]);
+ } else {
+ zzEncVal = literals[i];
+ }
+ zigzagLiterals[i] = zzEncVal;
}
}
@@ -700,7 +687,7 @@ class RunLengthIntegerWriterV2 implement
patchWidth = 0;
gapVsPatchList = null;
min = 0;
- isFixedDelta = false;
+ isFixedDelta = true;
}
@Override
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/orc/SerializationUtils.java Sun Oct 5 22:26:43 2014
@@ -1283,4 +1283,9 @@ final class SerializationUtils {
+ ((readBuffer[rbOffset + 7] & 255) << 0));
}
+ // Do not want to use Guava LongMath.checkedSubtract() here as it will throw
+ // ArithmeticException in case of overflow
+ public boolean isSafeSubtract(long left, long right) {
+ return (left ^ right) >= 0 | (left ^ (left - right)) >= 0;
+ }
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ArrayWritableGroupConverter.java Sun Oct 5 22:26:43 2014
@@ -13,9 +13,6 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.convert;
-import java.util.List;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
@@ -33,7 +30,7 @@ public class ArrayWritableGroupConverter
private Writable[] mapPairContainer;
public ArrayWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
- final int index, List<TypeInfo> hiveSchemaTypeInfos) {
+ final int index) {
this.parent = parent;
this.index = index;
int count = groupType.getFieldCount();
@@ -43,8 +40,7 @@ public class ArrayWritableGroupConverter
isMap = count == 2;
converters = new Converter[count];
for (int i = 0; i < count; i++) {
- converters[i] = getConverterFromDescription(groupType.getType(i), i, this,
- hiveSchemaTypeInfos);
+ converters[i] = getConverterFromDescription(groupType.getType(i), i, this);
}
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableGroupConverter.java Sun Oct 5 22:26:43 2014
@@ -16,7 +16,6 @@ package org.apache.hadoop.hive.ql.io.par
import java.util.ArrayList;
import java.util.List;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.io.Writable;
@@ -37,21 +36,19 @@ public class DataWritableGroupConverter
private final Object[] currentArr;
private Writable[] rootMap;
- public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema,
- final List<TypeInfo> hiveSchemaTypeInfos) {
- this(requestedSchema, null, 0, tableSchema, hiveSchemaTypeInfos);
+ public DataWritableGroupConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+ this(requestedSchema, null, 0, tableSchema);
final int fieldCount = tableSchema.getFieldCount();
this.rootMap = new Writable[fieldCount];
}
public DataWritableGroupConverter(final GroupType groupType, final HiveGroupConverter parent,
- final int index, final List<TypeInfo> hiveSchemaTypeInfos) {
- this(groupType, parent, index, groupType, hiveSchemaTypeInfos);
+ final int index) {
+ this(groupType, parent, index, groupType);
}
public DataWritableGroupConverter(final GroupType selectedGroupType,
- final HiveGroupConverter parent, final int index, final GroupType containingGroupType,
- final List<TypeInfo> hiveSchemaTypeInfos) {
+ final HiveGroupConverter parent, final int index, final GroupType containingGroupType) {
this.parent = parent;
this.index = index;
final int totalFieldCount = containingGroupType.getFieldCount();
@@ -65,8 +62,7 @@ public class DataWritableGroupConverter
Type subtype = selectedFields.get(i);
if (containingGroupType.getFields().contains(subtype)) {
converters[i] = getConverterFromDescription(subtype,
- containingGroupType.getFieldIndex(subtype.getName()), this,
- hiveSchemaTypeInfos);
+ containingGroupType.getFieldIndex(subtype.getName()), this);
} else {
throw new IllegalStateException("Group type [" + containingGroupType +
"] does not contain requested field: " + subtype);
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java Sun Oct 5 22:26:43 2014
@@ -31,10 +31,8 @@ public class DataWritableRecordConverter
private final DataWritableGroupConverter root;
- public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema,
- final List<TypeInfo> hiveColumnTypeInfos) {
- this.root = new DataWritableGroupConverter(requestedSchema, tableSchema,
- hiveColumnTypeInfos);
+ public DataWritableRecordConverter(final GroupType requestedSchema, final GroupType tableSchema) {
+ this.root = new DataWritableGroupConverter(requestedSchema, tableSchema);
}
@Override
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java Sun Oct 5 22:26:43 2014
@@ -16,19 +16,12 @@ package org.apache.hadoop.hive.ql.io.par
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.ArrayList;
-import java.util.List;
-import org.apache.hadoop.hive.common.type.HiveChar;
-import org.apache.hadoop.hive.common.type.HiveVarchar;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
-import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
-import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
import org.apache.hadoop.hive.serde2.io.TimestampWritable;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
@@ -152,32 +145,6 @@ public enum ETypeConverter {
}
};
}
- },
- ECHAR_CONVERTER(HiveCharWritable.class) {
- @Override
- Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
- return new BinaryConverter<HiveCharWritable>(type, parent, index) {
- @Override
- protected HiveCharWritable convert(Binary binary) {
- HiveChar hiveChar = new HiveChar();
- hiveChar.setValue(binary.toStringUsingUTF8());
- return new HiveCharWritable(hiveChar);
- }
- };
- }
- },
- EVARCHAR_CONVERTER(HiveVarcharWritable.class) {
- @Override
- Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
- return new BinaryConverter<HiveVarcharWritable>(type, parent, index) {
- @Override
- protected HiveVarcharWritable convert(Binary binary) {
- HiveVarchar hiveVarchar = new HiveVarchar();
- hiveVarchar.setValue(binary.toStringUsingUTF8());
- return new HiveVarcharWritable(hiveVarchar);
- }
- };
- }
};
final Class<?> _type;
@@ -193,7 +160,7 @@ public enum ETypeConverter {
abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent);
public static Converter getNewConverter(final PrimitiveType type, final int index,
- final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
+ final HiveGroupConverter parent) {
if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) {
//TODO- cleanup once parquet support Timestamp type annotation.
return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent);
@@ -201,15 +168,7 @@ public enum ETypeConverter {
if (OriginalType.DECIMAL == type.getOriginalType()) {
return EDECIMAL_CONVERTER.getConverter(type, index, parent);
} else if (OriginalType.UTF8 == type.getOriginalType()) {
- if (hiveSchemaTypeInfos.get(index).getTypeName()
- .startsWith(serdeConstants.CHAR_TYPE_NAME)) {
- return ECHAR_CONVERTER.getConverter(type, index, parent);
- } else if (hiveSchemaTypeInfos.get(index).getTypeName()
- .startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
- return EVARCHAR_CONVERTER.getConverter(type, index, parent);
- } else if (type.isPrimitive()) {
- return ESTRING_CONVERTER.getConverter(type, index, parent);
- }
+ return ESTRING_CONVERTER.getConverter(type, index, parent);
}
Class<?> javaType = type.getPrimitiveTypeName().javaType;
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java Sun Oct 5 22:26:43 2014
@@ -13,9 +13,6 @@
*/
package org.apache.hadoop.hive.ql.io.parquet.convert;
-import java.util.List;
-
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.io.Writable;
import parquet.io.api.Converter;
@@ -26,20 +23,17 @@ import parquet.schema.Type.Repetition;
public abstract class HiveGroupConverter extends GroupConverter {
protected static Converter getConverterFromDescription(final Type type, final int index,
- final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
+ final HiveGroupConverter parent) {
if (type == null) {
return null;
}
if (type.isPrimitive()) {
- return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent,
- hiveSchemaTypeInfos);
+ return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent);
} else {
if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
- return new ArrayWritableGroupConverter(type.asGroupType(), parent, index,
- hiveSchemaTypeInfos);
+ return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
} else {
- return new DataWritableGroupConverter(type.asGroupType(), parent, index,
- hiveSchemaTypeInfos);
+ return new DataWritableGroupConverter(type.asGroupType(), parent, index);
}
}
}
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Sun Oct 5 22:26:43 2014
@@ -14,7 +14,6 @@
package org.apache.hadoop.hive.ql.io.parquet.read;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -24,8 +23,6 @@ import org.apache.hadoop.hive.ql.io.IOCo
import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter;
import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
-import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.ArrayWritable;
import org.apache.hadoop.util.StringUtils;
@@ -56,7 +53,7 @@ public class DataWritableReadSupport ext
* From a string which columns names (including hive column), return a list
* of string columns
*
- * @param comma separated list of columns
+ * @param columns comma separated list of columns
* @return list with virtual columns removed
*/
private static List<String> getColumns(final String columns) {
@@ -64,27 +61,6 @@ public class DataWritableReadSupport ext
removeVirtualColumns(StringUtils.getStringCollection(columns));
}
- private static List<TypeInfo> getColumnTypes(Configuration configuration) {
-
- List<String> columnNames;
- String columnNamesProperty = configuration.get(IOConstants.COLUMNS);
- if (columnNamesProperty.length() == 0) {
- columnNames = new ArrayList<String>();
- } else {
- columnNames = Arrays.asList(columnNamesProperty.split(","));
- }
- List<TypeInfo> columnTypes;
- String columnTypesProperty = configuration.get(IOConstants.COLUMNS_TYPES);
- if (columnTypesProperty.length() == 0) {
- columnTypes = new ArrayList<TypeInfo>();
- } else {
- columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypesProperty);
- }
-
- columnTypes = VirtualColumn.removeVirtualColumnTypes(columnNames, columnTypes);
- return columnTypes;
- }
-
/**
*
* It creates the readContext for Parquet side with the requested schema during the init phase.
@@ -173,8 +149,7 @@ public class DataWritableReadSupport ext
}
final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
- return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema,
- getColumnTypes(configuration));
+ return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
}
/**
@@ -194,4 +169,4 @@ public class DataWritableReadSupport ext
}
return requestedSchema;
}
-}
+}
\ No newline at end of file
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Sun Oct 5 22:26:43 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.lockmg
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.thrift.TException;
@@ -42,10 +43,10 @@ public class DbLockManager implements Hi
private static final long MAX_SLEEP = 15000;
private HiveLockManagerCtx context;
private Set<DbHiveLock> locks;
- private HiveMetaStoreClient client;
+ private IMetaStoreClient client;
private long nextSleep = 50;
- DbLockManager(HiveMetaStoreClient client) {
+ DbLockManager(IMetaStoreClient client) {
locks = new HashSet<DbHiveLock>();
this.client = client;
}
@@ -210,8 +211,8 @@ public class DbLockManager implements Hi
/**
* Clear the memory of the locks in this object. This won't clear the locks from the database.
* It is for use with
- * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and
- * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}.
+ * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and
+ * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}.
*/
void clearLocalLockRecords() {
locks.clear();
Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Sun Oct 5 22:26:43 2014
@@ -31,6 +31,8 @@ import org.apache.hadoop.hive.ql.QueryPl
import org.apache.hadoop.hive.ql.hooks.Entity;
import org.apache.hadoop.hive.ql.hooks.ReadEntity;
import org.apache.hadoop.hive.ql.hooks.WriteEntity;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.thrift.TException;
@@ -46,7 +48,7 @@ public class DbTxnManager extends HiveTx
static final private Log LOG = LogFactory.getLog(CLASS_NAME);
private DbLockManager lockMgr = null;
- private HiveMetaStoreClient client = null;
+ private IMetaStoreClient client = null;
private long txnId = 0;
DbTxnManager() {
@@ -284,7 +286,7 @@ public class DbTxnManager extends HiveTx
public ValidTxnList getValidTxns() throws LockException {
init();
try {
- return client.getValidTxns();
+ return client.getValidTxns(txnId);
} catch (TException e) {
throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
e);
@@ -311,7 +313,6 @@ public class DbTxnManager extends HiveTx
try {
if (txnId > 0) rollbackTxn();
if (lockMgr != null) lockMgr.close();
- if (client != null) client.close();
} catch (Exception e) {
LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
+ ">, swallowing as there is nothing we can do with it.");
@@ -326,10 +327,12 @@ public class DbTxnManager extends HiveTx
"methods.");
}
try {
- client = new HiveMetaStoreClient(conf);
+ Hive db = Hive.get(conf);
+ client = db.getMSC();
} catch (MetaException e) {
- throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(),
- e);
+ throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
+ } catch (HiveException e) {
+ throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
}
}
}