You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/06/12 00:03:32 UTC

svn commit: r1491988 - in /hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql: exec/vector/ io/ io/orc/

Author: hashutosh
Date: Tue Jun 11 22:03:31 2013
New Revision: 1491988

URL: http://svn.apache.org/r1491988
Log:
HIVE-4706 : Query on Table with partition columns fail with AlreadyBeingCreatedException (Sarvesh Sakalanaga via Ashutosh Chauhan)

Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Jun 11 22:03:31 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.exec.v
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
@@ -54,6 +56,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.ListBucketingCtx;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
@@ -597,7 +600,7 @@ public class VectorFileSinkOperator exte
       }
     /* Create list bucketing sub-directory only if stored-as-directories is on. */
     String lbDirName = null;
-    //lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
+    lbDirName = (lbCtx == null) ? null : generateListBucketingDirName(row);
 
     FSPaths fpaths;
 
@@ -682,6 +685,52 @@ public class VectorFileSinkOperator exte
   }
 
   /**
+   * Generate list bucketing directory name from a row.
+   * @param row row to process.
+   * @return directory name.
+   */
+  private String generateListBucketingDirName(Object row) {
+    if (!this.isSkewedStoredAsSubDirectories) {
+      return null;
+    }
+
+    String lbDirName = null;
+    List<Object> standObjs = new ArrayList<Object>();
+    List<String> skewedCols = lbCtx.getSkewedColNames();
+    List<List<String>> allSkewedVals = lbCtx.getSkewedColValues();
+    List<String> skewedValsCandidate = null;
+    Map<SkewedValueList, String> locationMap = lbCtx.getLbLocationMap();
+
+    /* Convert input row to standard objects. */
+    ObjectInspectorUtils.copyToStandardObject(standObjs, row,
+        (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+
+    assert (standObjs.size() >= skewedCols.size()) :
+      "The row has less number of columns than no. of skewed column.";
+
+    skewedValsCandidate = new ArrayList<String>(skewedCols.size());
+    for (SkewedColumnPositionPair posPair : lbCtx.getRowSkewedIndex()) {
+      skewedValsCandidate.add(posPair.getSkewColPosition(),
+          standObjs.get(posPair.getTblColPosition()).toString());
+    }
+    /* The row matches skewed column names. */
+    if (allSkewedVals.contains(skewedValsCandidate)) {
+      /* matches skewed values. */
+      lbDirName = FileUtils.makeListBucketingDirName(skewedCols, skewedValsCandidate);
+      locationMap.put(new SkewedValueList(skewedValsCandidate), lbDirName);
+    } else {
+      /* create default directory. */
+      lbDirName = FileUtils.makeDefaultListBucketingDirName(skewedCols,
+          lbCtx.getDefaultDirName());
+      List<String> defaultKey = Arrays.asList(lbCtx.getDefaultKey());
+      if (!locationMap.containsKey(defaultKey)) {
+        locationMap.put(new SkewedValueList(defaultKey), lbDirName);
+      }
+    }
+    return lbDirName;
+  }
+
+  /**
    * Lookup list bucketing path.
    * @param lbDirName
    * @return

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Tue Jun 11 22:03:31 2013
@@ -412,6 +412,9 @@ public class VectorMapOperator extends O
 
     statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
     Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
+    Map<String, Operator<? extends OperatorDesc>> aliasToVectorOpMap =
+        new HashMap<String, Operator<? extends OperatorDesc>>();
+
     try {
       for (String onefile : conf.getPathToAliases().keySet()) {
         MapOpCtx opCtx = initObjectInspector(conf, hconf, onefile, convertedOI);
@@ -440,8 +443,12 @@ public class VectorMapOperator extends O
           LOG.info("Adding alias " + onealias + " to work list for file "
             + onefile);
 
-          Operator<? extends OperatorDesc> vectorOp = vectorizeOperator(op,
-              vectorizationContext);
+          Operator<? extends OperatorDesc> vectorOp = aliasToVectorOpMap.get(onealias);
+
+          if (vectorOp == null) {
+            vectorOp = vectorizeOperator(op, vectorizationContext);
+            aliasToVectorOpMap.put(onealias, vectorOp);
+          }
 
           System.out.println("Using vectorized op: "+ vectorOp.getName());
           LOG.info("Using vectorized op: " + vectorOp.getName());
@@ -499,8 +506,7 @@ public class VectorMapOperator extends O
         vectorOp = new VectorSelectOperator(vectorizationContext, op.getConf());
         break;
       case FILESINK:
-        vectorOp = new VectorFileSinkOperator(vectorizationContext,
-            op.getConf());
+        vectorOp = new VectorFileSinkOperator(vectorizationContext, op.getConf());
         break;
       case TABLESCAN:
         vectorOp = op.cloneOp();
@@ -631,7 +637,25 @@ public class VectorMapOperator extends O
   // multiple files/partitions.
   @Override
   public void cleanUpInputFileChangedOp() throws HiveException {
+    Path fpath = new Path((new Path(this.getExecContext().getCurrentInputFile()))
+        .toUri().getPath());
+
+    for (String onefile : conf.getPathToAliases().keySet()) {
+      Path onepath = new Path(new Path(onefile).toUri().getPath());
+      // check for the operators who will process rows coming to this Map
+      // Operator
+      if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
+        String onealias = conf.getPathToAliases().get(onefile).get(0);
+        Operator<? extends OperatorDesc> op =
+            conf.getAliasToWork().get(onealias);
 
+        LOG.info("Processing alias " + onealias + " for file " + onefile);
+
+        MapInputPath inp = new MapInputPath(onefile, onealias, op);
+        //setInspectorInput(inp);
+        break;
+      }
+    }
   }
 
   public static Writable[] populateVirtualColumnValues(ExecMapperContext ctx,

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Jun 11 22:03:31 2013
@@ -279,7 +279,7 @@ public class VectorizedRowBatchCtx {
   {
     List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs();
     for (int i = 0; i < fieldRefs.size(); i++) {
-      if (fieldRefs.get(i).getFieldName() == colName) {
+      if (fieldRefs.get(i).getFieldName().equals(colName)) {
         return i;
       }
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java Tue Jun 11 22:03:31 2013
@@ -58,6 +58,7 @@ public class VectorizedRCFileRecordReade
   private VectorizedRowBatchCtx rbCtx;
   private final LongWritable keyCache = new LongWritable();
   private final BytesRefArrayWritable colsCache = new BytesRefArrayWritable();
+  private boolean addPartitionCols = true;
 
   private static RCFileSyncCache syncCache = new RCFileSyncCache();
 
@@ -149,10 +150,6 @@ public class VectorizedRCFileRecordReade
     VectorizedRowBatch result = null;
     try {
       result = rbCtx.CreateVectorizedRowBatch();
-      // Since the record reader works only on one split and
-      // given a split the partition cannot change, we are setting the partition
-      // values only once during batch creation
-      rbCtx.AddPartitionColsToBatch(result);
     } catch (HiveException e) {
       new RuntimeException("Error creating a batch", e);
     }
@@ -175,9 +172,18 @@ public class VectorizedRCFileRecordReade
 
     int i = 0;
     try {
+
       for (; i < VectorizedRowBatch.DEFAULT_SIZE; i++) {
         more = next(keyCache);
         if (more) {
+          // Check and update partition cols if necessary. Ideally this should be done
+          // in CreateValue() as the partition is constant per split. But since Hive uses
+          // CombineHiveRecordReader and as this does not call CreateValue() for
+          // each new RecordReader it creates, this check is required in next()
+          if (addPartitionCols) {
+            rbCtx.AddPartitionColsToBatch(value);
+            addPartitionCols = false;
+          }
           in.getCurrentRow(colsCache);
           // Currently RCFile reader does not support reading vectorized
           // data. Populating the batch by adding one row at a time.

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1491988&r1=1491987&r2=1491988&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Jun 11 22:03:31 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.FileSplit;
@@ -46,7 +45,7 @@ import org.apache.hadoop.mapred.Reporter
  * A MapReduce/Hive input format for ORC files.
  */
 public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
-  implements InputFormatChecker, VectorizedInputFormatInterface {
+    implements InputFormatChecker, VectorizedInputFormatInterface {
 
   private static class VectorizedOrcRecordReader
       implements RecordReader<NullWritable, VectorizedRowBatch> {
@@ -55,6 +54,7 @@ public class VectorizedOrcInputFormat ex
     private final long length;
     private float progress = 0.0f;
     private VectorizedRowBatchCtx rbCtx;
+    private boolean addPartitionCols = true;
 
     VectorizedOrcRecordReader(Reader file, Configuration conf,
         FileSplit fileSplit) throws IOException {
@@ -78,10 +78,19 @@ public class VectorizedOrcInputFormat ex
       if (!reader.hasNext()) {
         return false;
       }
-      reader.nextBatch(value);
       try {
-        rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object)value, value.size, value);
-      } catch (SerDeException e) {
+        // Check and update partition cols if necessary. Ideally, this should be done
+        // in CreateValue as the partition is constant per split. But since Hive uses
+        // CombineHiveRecordReader and
+        // as this does not call CreateValue for each new RecordReader it creates, this check is
+        // required in next()
+        if (addPartitionCols) {
+          rbCtx.AddPartitionColsToBatch(value);
+          addPartitionCols = false;
+        }
+        reader.nextBatch(value);
+        rbCtx.ConvertRowBatchBlobToVectorizedBatch((Object) value, value.size, value);
+      } catch (Exception e) {
         new RuntimeException(e);
       }
       progress = reader.getProgress();
@@ -98,10 +107,6 @@ public class VectorizedOrcInputFormat ex
       VectorizedRowBatch result = null;
       try {
         result = rbCtx.CreateVectorizedRowBatch();
-        // Since the record reader works only on one split and
-        // given a split the partition cannot change, we are setting the partition
-        // values only once during batch creation
-        rbCtx.AddPartitionColsToBatch(result);
       } catch (HiveException e) {
         new RuntimeException("Error creating a batch", e);
       }
@@ -131,29 +136,36 @@ public class VectorizedOrcInputFormat ex
 
   /**
    * Recurse down into a type subtree turning on all of the sub-columns.
-   * @param types the types of the file
-   * @param result the global view of columns that should be included
-   * @param typeId the root of tree to enable
+   *
+   * @param types
+   *          the types of the file
+   * @param result
+   *          the global view of columns that should be included
+   * @param typeId
+   *          the root of tree to enable
    */
   private static void includeColumnRecursive(List<OrcProto.Type> types,
-                                             boolean[] result,
-                                             int typeId) {
+      boolean[] result,
+      int typeId) {
     result[typeId] = true;
     OrcProto.Type type = types.get(typeId);
     int children = type.getSubtypesCount();
-    for(int i=0; i < children; ++i) {
+    for (int i = 0; i < children; ++i) {
       includeColumnRecursive(types, result, type.getSubtypes(i));
     }
   }
 
   /**
    * Take the configuration and figure out which columns we need to include.
-   * @param types the types of the file
-   * @param conf the configuration
+   *
+   * @param types
+   *          the types of the file
+   * @param conf
+   *          the configuration
    * @return true for each column that should be included
    */
   private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
-                                               Configuration conf) {
+      Configuration conf) {
     String includedStr =
         conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
     if (includedStr == null || includedStr.trim().length() == 0) {
@@ -164,13 +176,13 @@ public class VectorizedOrcInputFormat ex
       result[0] = true;
       OrcProto.Type root = types.get(0);
       List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
-      for(int i=0; i < root.getSubtypesCount(); ++i) {
+      for (int i = 0; i < root.getSubtypesCount(); ++i) {
         if (included.contains(i)) {
           includeColumnRecursive(types, result, root.getSubtypes(i));
         }
       }
       // if we are filtering at least one column, return the boolean array
-      for(boolean include: result) {
+      for (boolean include : result) {
         if (!include) {
           return result;
         }
@@ -182,7 +194,7 @@ public class VectorizedOrcInputFormat ex
   @Override
   public RecordReader<NullWritable, VectorizedRowBatch>
       getRecordReader(InputSplit inputSplit, JobConf conf,
-                      Reporter reporter) throws IOException {
+          Reporter reporter) throws IOException {
     FileSplit fileSplit = (FileSplit) inputSplit;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(conf);
@@ -192,8 +204,8 @@ public class VectorizedOrcInputFormat ex
 
   @Override
   public boolean validateInput(FileSystem fs, HiveConf conf,
-                               ArrayList<FileStatus> files
-                              ) throws IOException {
+      ArrayList<FileStatus> files
+      ) throws IOException {
     if (files.size() <= 0) {
       return false;
     }