You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/12 00:03:32 UTC
svn commit: r1491988 - in
/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql:
exec/vector/ io/ io/orc/
Author: hashutosh
Date: Tue Jun 11 22:03:31 2013
New Revision: 1491988
URL: http://svn.apache.org/r1491988
Log:
HIVE-4706 : Query on Table with partition columns fail with AlreadyBeingCreatedException (Sarvesh Sakalanaga via Ashutosh Chauhan)
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Jun 11 22:03:31 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.v
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.ExecDriver;
import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.stats.StatsPublisher;
import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -597,7 +600,7 @@ public class VectorFileSinkOperator exte
}
/* Create list bucketing sub-directory only if stored-as-directories is on. */
String lbDirName = null;
- //lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
+ lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
FSPaths fpaths;
@@ -682,6 +685,52 @@ public class VectorFileSinkOperator exte
}
/**
+ * Generate list bucketing directory name from a row.
+ * @param row row to process.
+ * @return directory name.
+ */
+ private String generateListBucketingDirName(Object row) {
+ if (!this.isSkewedStoredAsSubDirectories) {
+ return null;
+ }
+
+ String lbDirName = null;
+ List<Object> standObjs = new ArrayList<Object>();
+ List<String> skewedCols = lbCtx.getSkewedColNames();
+ List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
+ List<String> skewedValsCandidate = null;
+ Map<SkewedValueList, String> locationMap = lbCtx.getLbLocationMap();
+
+ /* Convert input row to standard objects. */
+ ObjectInspectorUtils.copyToStandardObject(standObjs, row,
+ (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+
+ assert (standObjs.size() >= skewedCols.size()) :
+ "The row has less number of columns than no. of skewed column.";
+
+ skewedValsCandidate = new ArrayList<String>(skewedCols.size());
+ for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
+ skewedValsCandidate.add(posPair.getSkewColPosition(),
+ standObjs.get(posPair.getTblColPosition()).toString());
+ }
+ /* The row matches skewed column names. */
+ if (allSkewedVals.contains(skewedValsCandidate)) {
+ /* matches skewed values. */
+ lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
+ locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName);
+ } else {
+ /* create default directory. */
+ lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
+ lbCtx.getDefaultDirName());
+ List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
+ if (!locationMap.containsKey(defaultKey)) {
+ locationMap.put(new SkewedValueList(defaultKey), lbDirName);
+ }
+ }
+ return lbDirName;
+ }
+
+ /**
* Lookup list bucketing path.
* @param lbDirName
* @return
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Tue Jun 11 22:03:31 2013
@@ -412,6 +412,9 @@ public class VectorMapOperator extends O
statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
+ Map<String, Operator<? extends OperatorDesc>> aliasToVectorOpMap =
+ new HashMap<String, Operator<? extends OperatorDesc>>();
+
try {
for (String onefile : conf.getPathToAliases().keySet()) {
MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI);
@@ -440,8 +443,12 @@ public class VectorMapOperator extends O
LOG.info("Adding alias " + onealias + " to work list for file "
+ onefile);
- Operator<? extends OperatorDesc> vectorOp = vectorizeOperator(op,
- vectorizationContext);
+ Operator<? extends OperatorDesc> vectorOp = aliasToVectorOpMap.get(onealias);
+
+ if (vectorOp == null) {
+ vectorOp = vectorizeOperator(op, vectorizationContext);
+ aliasToVectorOpMap.put(onealias, vectorOp);
+ }
System.out.println("Using vectorized op: "+ vectorOp.getName());
LOG.info("Using vectorized op: " + vectorOp.getName());
@@ -499,8 +506,7 @@ public class VectorMapOperator extends O
vectorOp = new VectorSelectOperator(vectorizationContext, op.getConf());
break;
case FILESINK:
- vectorOp = new VectorFileSinkOperator(vectorizationContext,
- op.getConf());
+ vectorOp = new VectorFileSinkOperator(vectorizationContext, op.getConf());
break;
case TABLESCAN:
vectorOp = op.cloneOp();
@@ -631,7 +637,25 @@ public class VectorMapOperator extends O
// multiple files/partitions.
@Override
public void cleanUpInputFileChangedOp() throws HiveException {
+ Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile()))
+ .toUri().getPath());
+
+ for (String onefile : conf.getPathToAliases().keySet()) {
+ Path onepath = new Path(new Path(onefile).toUri().getPath());
+ // check for the operators who will process rows coming to this Map
+ // Operator
+ if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+ String onealias = conf.getPathToAliases().get(onefile).get(0);
+ Operator<? extends OperatorDesc> op =
+ conf.getAliasToWork().get(onealias);
+ LOG.info("Processing alias " + onealias + " for file " + onefile);
+
+ MapInputPath inp = new MapInputPath(onefile, onealias, op);
+ //setInspectorInput(inp);
+ break;
+ }
+ }
}
public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx,
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Jun 11 22:03:31 2013
@@ -279,7 +279,7 @@ public class VectorizedRowBatchCtx {
{
List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
for (int i = 0; i < fieldRefs.size(); i++) {
- if (fieldRefs.get(i).getFieldName() == colName) {
+ if (fieldRefs.get(i).getFieldName().equals(colName)) {
return i;
}
}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Tue Jun 11 22:03:31 2013
@@ -58,6 +58,7 @@ public class VectorizedRCFileRecordReade
private VectorizedRowBatchCtx rbCtx;
private final LongWritable keyCache = new LongWritable();
private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
+ private boolean addPartitionCols = true;
private static RCFileSyncCache syncCache = new RCFileSyncCache();
@@ -149,10 +150,6 @@ public class VectorizedRCFileRecordReade
VectorizedRowBatch result = null;
try {
result = rbCtx.CreateVectorizedRowBatch();
- // Since the record reader works only on one split and
- // given a split the partition cannot change, we are setting the partition
- // values only once during batch creation
- rbCtx.AddPartitionColsToBatch(result);
} catch (HiveException e) {
new RuntimeException("Error creating a batch", e);
}
@@ -175,9 +172,18 @@ public class VectorizedRCFileRecordReade
int i = 0;
try {
+
for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
more = next(keyCache);
if (more) {
+ // Check and update partition cols if necessary. Ideally this should be done
+ // in CreateValue() as the partition is constant per split. But since Hive uses
+ // CombineHiveRecordReader and as this does not call CreateValue() for
+ // each new RecordReader it creates, this check is required in next()
+ if (addPartitionCols) {
+ rbCtx.AddPartitionColsToBatch(value);
+ addPartitionCols = false;
+ }
in.getCurrentRow(colsCache);
// Currently RCFile reader does not support reading vectorized
// data. Populating the batch by adding one row at a time.
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Jun 11 22:03:31 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.ve
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
@@ -46,7 +45,7 @@ import org.apache.hadoop.mapred.Reporter
* A MapReduce/Hive input format for ORC files.
*/
public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
- implements InputFormatChecker, VectorizedInputFormatInterface {
+ implements InputFormatChecker, VectorizedInputFormatInterface {
private static class VectorizedOrcRecordReader
implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -55,6 +54,7 @@ public class VectorizedOrcInputFormat ex
private final long length;
private float progress = 0.0f;
private VectorizedRowBatchCtx rbCtx;
+ private boolean addPartitionCols = true;
VectorizedOrcRecordReader(Reader file, Configuration conf,
FileSplit fileSplit) throws IOException {
@@ -78,10 +78,19 @@ public class VectorizedOrcInputFormat ex
if (!reader.hasNext()) {
return false;
}
- reader.nextBatch(value);
try {
- rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value);
- } catch (SerDeException e) {
+ // Check and update partition cols if necessary. Ideally, this should be done
+ // in CreateValue as the partition is constant per split. But since Hive uses
+ // CombineHiveRecordReader and
+ // as this does not call CreateValue for each new RecordReader it creates, this check is
+ // required in next()
+ if (addPartitionCols) {
+ rbCtx.AddPartitionColsToBatch(value);
+ addPartitionCols = false;
+ }
+ reader.nextBatch(value);
+ rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object) value, value.size, value);
+ } catch (Exception e) {
new RuntimeException(e);
}
progress = reader.getProgress();
@@ -98,10 +107,6 @@ public class VectorizedOrcInputFormat ex
VectorizedRowBatch result = null;
try {
result = rbCtx.CreateVectorizedRowBatch();
- // Since the record reader works only on one split and
- // given a split the partition cannot change, we are setting the partition
- // values only once during batch creation
- rbCtx.AddPartitionColsToBatch(result);
} catch (HiveException e) {
new RuntimeException("Error creating a batch", e);
}
@@ -131,29 +136,36 @@ public class VectorizedOrcInputFormat ex
/**
* Recurse down into a type subtree turning on all of the sub-columns.
- * @param types the types of the file
- * @param result the global view of columns that should be included
- * @param typeId the root of tree to enable
+ *
+ * @param types
+ * the types of the file
+ * @param result
+ * the global view of columns that should be included
+ * @param typeId
+ * the root of tree to enable
*/
private static void includeColumnRecursive(List<OrcProto.Type> types,
- boolean[] result,
- int typeId) {
+ boolean[] result,
+ int typeId) {
result[typeId] = true;
OrcProto.Type type = types.get(typeId);
int children = type.getSubtypesCount();
- for(int i=0; i < children; ++i) {
+ for (int i = 0; i < children; ++i) {
includeColumnRecursive(types, result, type.getSubtypes(i));
}
}
/**
* Take the configuration and figure out which columns we need to include.
- * @param types the types of the file
- * @param conf the configuration
+ *
+ * @param types
+ * the types of the file
+ * @param conf
+ * the configuration
* @return true for each column that should be included
*/
private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
- Configuration conf) {
+ Configuration conf) {
String includedStr =
conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
if (includedStr == null || includedStr.trim().length() == 0) {
@@ -164,13 +176,13 @@ public class VectorizedOrcInputFormat ex
result[0] = true;
OrcProto.Type root = types.get(0);
List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
- for(int i=0; i < root.getSubtypesCount(); ++i) {
+ for (int i = 0; i < root.getSubtypesCount(); ++i) {
if (included.contains(i)) {
includeColumnRecursive(types, result, root.getSubtypes(i));
}
}
// if we are filtering at least one column, return the boolean array
- for(boolean include: result) {
+ for (boolean include : result) {
if (!include) {
return result;
}
@@ -182,7 +194,7 @@ public class VectorizedOrcInputFormat ex
@Override
public RecordReader<NullWritable, VectorizedRowBatch>
getRecordReader(InputSplit inputSplit, JobConf conf,
- Reporter reporter) throws IOException {
+ Reporter reporter) throws IOException {
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
@@ -192,8 +204,8 @@ public class VectorizedOrcInputFormat ex
@Override
public boolean validateInput(FileSystem fs, HiveConf conf,
- ArrayList<FileStatus> files
- ) throws IOException {
+ ArrayList<FileStatus> files
+ ) throws IOException {
if (files.size() <= 0) {
return false;
}