You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 06:00:54 UTC

svn commit: r1629563 [6/33] - in /hive/branches/spark: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ contri...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Mon Oct  6 04:00:39 2014
@@ -29,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -38,13 +40,13 @@ import org.apache.hadoop.hive.common.Sta
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
+import org.apache.hadoop.hive.ql.io.RecordUpdater;
+import org.apache.hadoop.hive.ql.io.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveFatalException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -72,14 +74,16 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import com.google.common.collect.Lists;
-
 /**
  * File Sink operator implementation.
  **/
 public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements
     Serializable {
 
+  public static final Log LOG = LogFactory.getLog(FileSinkOperator.class);
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+
   protected transient HashMap<String, FSPaths> valToPaths;
   protected transient int numDynParts;
   protected transient List<String> dpColNames;
@@ -101,10 +105,6 @@ public class FileSinkOperator extends Te
   protected transient boolean isCollectRWStats;
   private transient FSPaths prevFsp;
   private transient FSPaths fpaths;
-  private transient ObjectInspector keyOI;
-  private transient List<Object> keyWritables;
-  private transient List<String> keys;
-  private transient int numKeyColToRead;
   private StructField recIdField; // field to find record identifier in
   private StructField bucketField; // field bucket is in in record id
   private StructObjectInspector recIdInspector; // OI for inspecting record id
@@ -131,9 +131,6 @@ public class FileSinkOperator extends Te
     int acidLastBucket = -1;
     int acidFileOffset = -1;
 
-    public FSPaths() {
-    }
-
     public FSPaths(Path specPath) {
       tmpPath = Utilities.toTempPath(specPath);
       taskOutputTempPath = Utilities.toTaskTempPath(specPath);
@@ -141,7 +138,9 @@ public class FileSinkOperator extends Te
       finalPaths = new Path[numFiles];
       outWriters = new RecordWriter[numFiles];
       updaters = new RecordUpdater[numFiles];
-      LOG.debug("Created slots for  " + numFiles);
+      if (isDebugEnabled) {
+        LOG.debug("Created slots for  " + numFiles);
+      }
       stat = new Stat();
     }
 
@@ -326,7 +325,6 @@ public class FileSinkOperator extends Te
       parent = Utilities.toTempPath(conf.getDirName());
       statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
       statsFromRecordWriter = new boolean[numFiles];
-
       serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
       serializer.initialize(null, conf.getTableInfo().getProperties());
       outputClass = serializer.getSerializedClass();
@@ -363,20 +361,6 @@ public class FileSinkOperator extends Te
         lbSetup();
       }
 
-      int numPart = 0;
-      int numBuck = 0;
-      if (conf.getPartitionCols() != null && !conf.getPartitionCols().isEmpty()) {
-        numPart = conf.getPartitionCols().size();
-      }
-
-      // bucket number will exists only in PARTITION_BUCKET_SORTED mode
-      if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
-        numBuck = 1;
-      }
-      numKeyColToRead = numPart + numBuck;
-      keys = Lists.newArrayListWithCapacity(numKeyColToRead);
-      keyWritables = Lists.newArrayListWithCapacity(numKeyColToRead);
-
       if (!bDynParts) {
         fsp = new FSPaths(specPath);
 
@@ -423,7 +407,8 @@ public class FileSinkOperator extends Te
     this.dpColNames = dpCtx.getDPColNames();
     this.maxPartitions = dpCtx.getMaxPartitionsPerNode();
 
-    assert numDynParts == dpColNames.size() : "number of dynamic paritions should be the same as the size of DP mapping";
+    assert numDynParts == dpColNames.size()
+        : "number of dynamic paritions should be the same as the size of DP mapping";
 
     if (dpColNames != null && dpColNames.size() > 0) {
       this.bDynParts = true;
@@ -441,6 +426,9 @@ public class FileSinkOperator extends Te
           newFieldsOI.add(sf.getFieldObjectInspector());
           newFieldsName.add(sf.getFieldName());
           this.dpStartCol++;
+        } else {
+          // once we found the start column for partition column we are done
+          break;
         }
       }
       assert newFieldsOI.size() > 0 : "new Fields ObjectInspector is empty";
@@ -457,11 +445,15 @@ public class FileSinkOperator extends Te
       Set<Integer> seenBuckets = new HashSet<Integer>();
       for (int idx = 0; idx < totalFiles; idx++) {
         if (this.getExecContext() != null && this.getExecContext().getFileId() != null) {
-          LOG.info("replace taskId from execContext ");
+          if (isInfoEnabled) {
+            LOG.info("replace taskId from execContext ");
+          }
 
           taskId = Utilities.replaceTaskIdFromFilename(taskId, this.getExecContext().getFileId());
 
-          LOG.info("new taskId: FS " + taskId);
+          if (isInfoEnabled) {
+            LOG.info("new taskId: FS " + taskId);
+          }
 
           assert !multiFileSpray;
           assert totalFiles == 1;
@@ -515,9 +507,13 @@ public class FileSinkOperator extends Te
     try {
       if (isNativeTable) {
         fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, null);
-        LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
+        if (isInfoEnabled) {
+          LOG.info("Final Path: FS " + fsp.finalPaths[filesIdx]);
+        }
         fsp.outPaths[filesIdx] = fsp.getTaskOutPath(taskId);
-        LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
+        if (isInfoEnabled) {
+          LOG.info("Writing to temp file: FS " + fsp.outPaths[filesIdx]);
+        }
       } else {
         fsp.finalPaths[filesIdx] = fsp.outPaths[filesIdx] = specPath;
       }
@@ -532,7 +528,9 @@ public class FileSinkOperator extends Te
         fsp.finalPaths[filesIdx] = fsp.getFinalPath(taskId, fsp.tmpPath, extension);
       }
 
-      LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
+      if (isInfoEnabled) {
+        LOG.info("New Final Path: FS " + fsp.finalPaths[filesIdx]);
+      }
 
       if (isNativeTable) {
         // in recent hadoop versions, use deleteOnExit to clean tmp files.
@@ -604,14 +602,22 @@ public class FileSinkOperator extends Te
       updateProgress();
 
       // if DP is enabled, get the final output writers and prepare the real output row
-      assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT : "input object inspector is not struct";
+      assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT
+          : "input object inspector is not struct";
 
       if (bDynParts) {
+
+        // we need to read bucket number which is the last column in value (after partition columns)
+        if (conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
+          numDynParts += 1;
+        }
+
         // copy the DP column values from the input row to dpVals
         dpVals.clear();
         dpWritables.clear();
-        ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol, numDynParts,
-            (StructObjectInspector) inputObjInspectors[0], ObjectInspectorCopyOption.WRITABLE);
+        ObjectInspectorUtils.partialCopyToStandardObject(dpWritables, row, dpStartCol,numDynParts,
+            (StructObjectInspector) inputObjInspectors[0],ObjectInspectorCopyOption.WRITABLE);
+
         // get a set of RecordWriter based on the DP column values
         // pass the null value along to the escaping process to determine what the dir should be
         for (Object o : dpWritables) {
@@ -621,16 +627,11 @@ public class FileSinkOperator extends Te
             dpVals.add(o.toString());
           }
         }
-        // use SubStructObjectInspector to serialize the non-partitioning columns in the input row
-        recordValue = serializer.serialize(row, subSetOI);
 
-        // when dynamic partition sorting is not used, the DPSortState will be NONE
-        // in which we will fall back to old method of file system path creation
-        // i.e, having as many record writers as distinct values in partition column
-        if (conf.getDpSortState().equals(DPSortState.NONE)) {
-          fpaths = getDynOutPaths(dpVals, lbDirName);
-        }
+        fpaths = getDynOutPaths(dpVals, lbDirName);
 
+        // use SubStructObjectInspector to serialize the non-partitioning columns in the input row
+        recordValue = serializer.serialize(row, subSetOI);
       } else {
         if (lbDirName != null) {
           fpaths = lookupListBucketingPaths(lbDirName);
@@ -686,8 +687,10 @@ public class FileSinkOperator extends Te
           fpaths.updaters[++fpaths.acidFileOffset] = HiveFileFormatUtils.getAcidRecordUpdater(
               jc, conf.getTableInfo(), bucketNum, conf, fpaths.outPaths[fpaths.acidFileOffset],
               rowInspector, reporter, 0);
-          LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
-              fpaths.outPaths[fpaths.acidFileOffset]);
+          if (isDebugEnabled) {
+            LOG.debug("Created updater for bucket number " + bucketNum + " using file " +
+                fpaths.outPaths[fpaths.acidFileOffset]);
+          }
         }
 
         if (conf.getWriteType() == AcidUtils.Operation.UPDATE) {
@@ -834,10 +837,8 @@ public class FileSinkOperator extends Te
     if (dpDir != null) {
       dpDir = appendToSource(lbDirName, dpDir);
       pathKey = dpDir;
-      int numericBucketNum = 0;
       if(conf.getDpSortState().equals(DPSortState.PARTITION_BUCKET_SORTED)) {
         String buckNum = row.get(row.size() - 1);
-        numericBucketNum = Integer.valueOf(buckNum);
         taskId = Utilities.replaceTaskIdFromFilename(Utilities.getTaskId(hconf), buckNum);
         pathKey = appendToSource(taskId, dpDir);
       }
@@ -918,26 +919,6 @@ public class FileSinkOperator extends Te
   }
 
   @Override
-  public void startGroup() throws HiveException {
-    if (!conf.getDpSortState().equals(DPSortState.NONE)) {
-      keyOI = getGroupKeyObjectInspector();
-      keys.clear();
-      keyWritables.clear();
-      ObjectInspectorUtils.partialCopyToStandardObject(keyWritables, getGroupKeyObject(), 0,
-          numKeyColToRead, (StructObjectInspector) keyOI, ObjectInspectorCopyOption.WRITABLE);
-
-      for (Object o : keyWritables) {
-        if (o == null || o.toString().length() == 0) {
-          keys.add(dpCtx.getDefaultPartitionName());
-        } else {
-          keys.add(o.toString());
-        }
-      }
-      fpaths = getDynOutPaths(keys, null);
-    }
-  }
-
-  @Override
   public void closeOp(boolean abort) throws HiveException {
 
     if (!bDynParts && !filesCreated) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Mon Oct  6 04:00:39 2014
@@ -76,7 +76,7 @@ public class FilterOperator extends Oper
       statsMap.put(Counter.FILTERED, filtered_count);
       statsMap.put(Counter.PASSED, passed_count);
       conditionInspector = null;
-      ioContext = IOContext.get();
+      ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
     } catch (Throwable e) {
       throw new HiveException(e);
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Mon Oct  6 04:00:39 2014
@@ -639,6 +639,14 @@ public final class FunctionRegistry {
     }
   }
 
+  public static String getNormalizedFunctionName(String fn) {
+    // Does the same thing as getFunctionInfo, except for getting the function info.
+    fn = fn.toLowerCase();
+    return (FunctionUtils.isQualifiedFunctionName(fn) || mFunctions.get(fn) != null) ? fn
+        : FunctionUtils.qualifyFunctionName(
+            fn, SessionState.get().getCurrentDatabase().toLowerCase());
+  }
+
   private static <T extends CommonFunctionInfo> T getFunctionInfo(
       Map<String, T> mFunctions, String functionName) {
     functionName = functionName.toLowerCase();
@@ -861,15 +869,7 @@ public final class FunctionRegistry {
             TypeInfoUtils.getCharacterLengthForType(b));
         return TypeInfoFactory.getVarcharTypeInfo(maxLength);
       case DECIMAL:
-          int prec1 = HiveDecimalUtils.getPrecisionForType(a);
-          int prec2 = HiveDecimalUtils.getPrecisionForType(b);
-          int scale1 = HiveDecimalUtils.getScaleForType(a);
-          int scale2 = HiveDecimalUtils.getScaleForType(b);
-          int intPart = Math.max(prec1 - scale1, prec2 - scale2);
-          int decPart = Math.max(scale1, scale2);
-          int prec =  Math.min(intPart + decPart, HiveDecimal.MAX_PRECISION);
-          int scale = Math.min(decPart, HiveDecimal.MAX_PRECISION - intPart);
-          return TypeInfoFactory.getDecimalTypeInfo(prec, scale);
+      return HiveDecimalUtils.getDecimalTypeForPrimitiveCategories(a, b);
       default:
         // Type doesn't require any qualifiers.
         return TypeInfoFactory.getPrimitiveTypeInfo(

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Mon Oct  6 04:00:39 2014
@@ -77,6 +77,7 @@ public class GroupByOperator extends Ope
 
   private static final Log LOG = LogFactory.getLog(GroupByOperator.class
       .getName());
+  private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final long serialVersionUID = 1L;
   private static final int NUMROWSESTIMATESIZE = 1000;
 
@@ -101,6 +102,7 @@ public class GroupByOperator extends Ope
   transient ExprNodeEvaluator unionExprEval = null;
 
   transient GenericUDAFEvaluator[] aggregationEvaluators;
+  transient boolean[] estimableAggregationEvaluators;
 
   protected transient ArrayList<ObjectInspector> objectInspectors;
   transient ArrayList<String> fieldNames;
@@ -442,10 +444,10 @@ public class GroupByOperator extends Ope
     estimateRowSize();
   }
 
-  private static final int javaObjectOverHead = 64;
-  private static final int javaHashEntryOverHead = 64;
-  private static final int javaSizePrimitiveType = 16;
-  private static final int javaSizeUnknownType = 256;
+  public static final int javaObjectOverHead = 64;
+  public static final int javaHashEntryOverHead = 64;
+  public static final int javaSizePrimitiveType = 16;
+  public static final int javaSizeUnknownType = 256;
 
   /**
    * The size of the element at position 'pos' is returned, if possible. If the
@@ -557,11 +559,13 @@ public class GroupByOperator extends Ope
     // Go over all the aggregation classes and and get the size of the fields of
     // fixed length. Keep track of the variable length
     // fields in these aggregation classes.
+    estimableAggregationEvaluators = new boolean[aggregationEvaluators.length];
     for (int i = 0; i < aggregationEvaluators.length; i++) {
 
       fixedRowSize += javaObjectOverHead;
       AggregationBuffer agg = aggregationEvaluators[i].getNewAggregationBuffer();
       if (GenericUDAFEvaluator.isEstimable(agg)) {
+        estimableAggregationEvaluators[i] = true;
         continue;
       }
       Field[] fArr = ObjectInspectorUtils.getDeclaredNonStaticFields(agg.getClass());
@@ -765,10 +769,12 @@ public class GroupByOperator extends Ope
           flushHashTable(true);
           hashAggr = false;
         } else {
-          LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
-              + " #total = " + numRowsInput + " reduction = " + 1.0
-              * (numRowsHashTbl / numRowsInput) + " minReduction = "
-              + minReductionHashAggr);
+          if (isTraceEnabled) {
+            LOG.trace("Hash Aggr Enabled: #hash table = " + numRowsHashTbl
+                + " #total = " + numRowsInput + " reduction = " + 1.0
+                * (numRowsHashTbl / numRowsInput) + " minReduction = "
+                + minReductionHashAggr);
+          }
         }
       }
     }
@@ -952,7 +958,7 @@ public class GroupByOperator extends Ope
       AggregationBuffer[] aggs = hashAggregations.get(newKeys);
       for (int i = 0; i < aggs.length; i++) {
         AggregationBuffer agg = aggs[i];
-        if (GenericUDAFEvaluator.isEstimable(agg)) {
+        if (estimableAggregationEvaluators[i]) {
           totalVariableSize += ((GenericUDAFEvaluator.AbstractAggregationBuffer)agg).estimate();
           continue;
         }
@@ -966,8 +972,10 @@ public class GroupByOperator extends Ope
       // Update the number of entries that can fit in the hash table
       numEntriesHashTable =
           (int) (maxHashTblMemory / (fixedRowSize + (totalVariableSize / numEntriesVarSize)));
-      LOG.trace("Hash Aggr: #hash table = " + numEntries
-          + " #max in hash table = " + numEntriesHashTable);
+      if (isTraceEnabled) {
+        LOG.trace("Hash Aggr: #hash table = " + numEntries
+            + " #max in hash table = " + numEntriesHashTable);
+      }
     }
 
     // flush if necessary

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Mon Oct  6 04:00:39 2014
@@ -171,8 +171,9 @@ public class MapJoinOperator extends Abs
 
   private void loadHashTable() throws HiveException {
 
-    if (this.getExecContext().getLocalWork() == null
-        || !this.getExecContext().getLocalWork().getInputFileChangeSensitive()) {
+    if ((this.getExecContext() != null)
+        && ((this.getExecContext().getLocalWork() == null) || (!this.getExecContext()
+            .getLocalWork().getInputFileChangeSensitive()))) {
       if (hashTblInitedOnce) {
         return;
       } else {
@@ -313,8 +314,8 @@ public class MapJoinOperator extends Abs
         tableContainer.dumpMetrics();
       }
     }
-    if ((this.getExecContext().getLocalWork() != null
-        && this.getExecContext().getLocalWork().getInputFileChangeSensitive())
+    if ((this.getExecContext() != null) && (this.getExecContext().getLocalWork() != null)
+        && (this.getExecContext().getLocalWork().getInputFileChangeSensitive())
         && mapJoinTables != null) {
       for (MapJoinTableContainer tableContainer : mapJoinTables) {
         if (tableContainer != null) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Mon Oct  6 04:00:39 2014
@@ -33,9 +33,10 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.tez.MapRecordProcessor;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -181,7 +182,7 @@ public class MapOperator extends Operato
 
     PartitionDesc pd = ctx.partDesc;
     TableDesc td = pd.getTableDesc();
-    
+
     MapOpCtx opCtx = new MapOpCtx();
     // Use table properties in case of unpartitioned tables,
     // and the union of table properties and partition properties, with partition
@@ -205,42 +206,42 @@ public class MapOperator extends Operato
 
     opCtx.partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
         partRawRowObjectInspector, opCtx.tblRawRowObjectInspector);
-    
+
     // Next check if this table has partitions and if so
     // get the list of partition names as well as allocate
     // the serdes for the partition columns
     String pcols = overlayedProps.getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMNS);
-    
+
     if (pcols != null && pcols.length() > 0) {
       String[] partKeys = pcols.trim().split("/");
       String pcolTypes = overlayedProps
           .getProperty(hive_metastoreConstants.META_TABLE_PARTITION_COLUMN_TYPES);
       String[] partKeyTypes = pcolTypes.trim().split(":");
-      
+
       if (partKeys.length > partKeyTypes.length) {
           throw new HiveException("Internal error : partKeys length, " +partKeys.length +
                   " greater than partKeyTypes length, " + partKeyTypes.length);
       }
-      
+
       List<String> partNames = new ArrayList<String>(partKeys.length);
       Object[] partValues = new Object[partKeys.length];
       List<ObjectInspector> partObjectInspectors = new ArrayList<ObjectInspector>(partKeys.length);
-      
+
       for (int i = 0; i < partKeys.length; i++) {
         String key = partKeys[i];
         partNames.add(key);
         ObjectInspector oi = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector
             (TypeInfoFactory.getPrimitiveTypeInfo(partKeyTypes[i]));
-        
+
         // Partitions do not exist for this table
         if (partSpec == null) {
           // for partitionless table, initialize partValue to null
           partValues[i] = null;
         } else {
-            partValues[i] = 
+            partValues[i] =
                 ObjectInspectorConverters.
                 getConverter(PrimitiveObjectInspectorFactory.
-                    javaStringObjectInspector, oi).convert(partSpec.get(key)); 
+                    javaStringObjectInspector, oi).convert(partSpec.get(key));
         }
         partObjectInspectors.add(oi);
       }
@@ -337,13 +338,8 @@ public class MapOperator extends Operato
     return tableDescOI;
   }
 
-  private boolean isPartitioned(PartitionDesc pd) {
-    return pd.getPartSpec() != null && !pd.getPartSpec().isEmpty();
-  }
-
   public void setChildren(Configuration hconf) throws HiveException {
-
-    Path fpath = IOContext.get().getInputPath();
+    Path fpath = IOContext.get(hconf.get(Utilities.INPUT_NAME)).getInputPath();
 
     boolean schemeless = fpath.toUri().getScheme() == null;
 
@@ -639,4 +635,8 @@ public class MapOperator extends Operato
     return null;
   }
 
+  @Override
+  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+    return MapRecordProcessor.getConnectOps();
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Mon Oct  6 04:00:39 2014
@@ -353,6 +353,7 @@ public class MoveTask extends Task<MoveW
               pushFeed(FeedType.DYNAMIC_PARTITIONS, dps);
             }
 
+            long startTime = System.currentTimeMillis();
             // load the list of DP partitions and return the list of partition specs
             // TODO: In a follow-up to HIVE-1361, we should refactor loadDynamicPartitions
             // to use Utilities.getFullDPSpecs() to get the list of full partSpecs.
@@ -360,7 +361,7 @@ public class MoveTask extends Task<MoveW
             // iterate over it and call loadPartition() here.
             // The reason we don't do inside HIVE-1361 is the latter is large and we
             // want to isolate any potential issue it may introduce.
-            ArrayList<LinkedHashMap<String, String>> dp =
+            Map<Map<String, String>, Partition> dp =
               db.loadDynamicPartitions(
                 tbd.getSourcePath(),
                 tbd.getTable().getTableName(),
@@ -370,16 +371,19 @@ public class MoveTask extends Task<MoveW
                 tbd.getHoldDDLTime(),
                 isSkewedStoredAsDirs(tbd),
                 work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID);
+            console.printInfo("\t Time taken for load dynamic partitions : "  +
+                (System.currentTimeMillis() - startTime));
 
             if (dp.size() == 0 && conf.getBoolVar(HiveConf.ConfVars.HIVE_ERROR_ON_EMPTY_PARTITION)) {
               throw new HiveException("This query creates no partitions." +
                   " To turn off this error, set hive.error.on.empty.partition=false.");
             }
 
+            startTime = System.currentTimeMillis();
             // for each partition spec, get the partition
             // and put it to WriteEntity for post-exec hook
-            for (LinkedHashMap<String, String> partSpec: dp) {
-              Partition partn = db.getPartition(table, partSpec, false);
+            for(Map.Entry<Map<String, String>, Partition> entry : dp.entrySet()) {
+              Partition partn = entry.getValue();
 
               if (bucketCols != null || sortCols != null) {
                 updatePartitionBucketSortColumns(table, partn, bucketCols, numBuckets, sortCols);
@@ -412,8 +416,10 @@ public class MoveTask extends Task<MoveW
                     table.getCols());
               }
 
-              console.printInfo("\tLoading partition " + partSpec);
+              console.printInfo("\tLoading partition " + entry.getKey());
             }
+            console.printInfo("\t Time taken for adding to write entity : " +
+                (System.currentTimeMillis() - startTime));
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
             List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Mon Oct  6 04:00:39 2014
@@ -146,6 +146,7 @@ public abstract class Operator<T extends
   /**
    * Implements the getChildren function for the Node Interface.
    */
+  @Override
   public ArrayList<Node> getChildren() {
 
     if (getChildOperators() == null) {
@@ -497,8 +498,6 @@ public abstract class Operator<T extends
 
     LOG.debug("Starting group for children:");
     for (Operator<? extends OperatorDesc> op : childOperators) {
-      op.setGroupKeyObjectInspector(groupKeyOI);
-      op.setGroupKeyObject(groupKeyObject);
       op.startGroup();
     }
 
@@ -851,6 +850,7 @@ public abstract class Operator<T extends
    *
    * @return the name of the operator
    */
+  @Override
   public String getName() {
     return getOperatorName();
   }
@@ -968,7 +968,6 @@ public abstract class Operator<T extends
   }
 
   protected transient Object groupKeyObject;
-  protected transient ObjectInspector groupKeyOI;
 
   public String getOperatorId() {
     return operatorId;
@@ -1061,7 +1060,7 @@ public abstract class Operator<T extends
 
     if (parents != null) {
       for (Operator<? extends OperatorDesc> parent : parents) {
-        parentClones.add((Operator<? extends OperatorDesc>)(parent.clone()));
+        parentClones.add((parent.clone()));
       }
     }
 
@@ -1082,8 +1081,8 @@ public abstract class Operator<T extends
   public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
     T descClone = (T) conf.clone();
     Operator<? extends OperatorDesc> ret =
-        (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
-            descClone, getSchema());
+        OperatorFactory.getAndMakeChild(
+        descClone, getSchema());
     return ret;
   }
 
@@ -1254,15 +1253,15 @@ public abstract class Operator<T extends
     }
     return null;
   }
-  
+
   public OpTraits getOpTraits() {
     if (conf != null) {
       return conf.getOpTraits();
     }
-    
+
     return null;
   }
-  
+
   public void setOpTraits(OpTraits metaInfo) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Setting traits ("+metaInfo+") on "+this);
@@ -1285,21 +1284,23 @@ public abstract class Operator<T extends
     }
   }
 
-  public void setGroupKeyObjectInspector(ObjectInspector keyObjectInspector) {
-    this.groupKeyOI = keyObjectInspector;
-  }
-
-  public ObjectInspector getGroupKeyObjectInspector() {
-    return groupKeyOI;
-  }
-
   public static Operator createDummy() {
     return new DummyOperator();
   }
 
   private static class DummyOperator extends Operator {
     public DummyOperator() { super("dummy"); }
+    @Override
     public void processOp(Object row, int tag) { }
+    @Override
     public OperatorType getType() { return null; }
   }
+
+  public Map<Integer, DummyStoreOperator> getTagToOperatorTree() {
+    if ((parentOperators == null) || (parentOperators.size() == 0)) {
+      return null;
+    }
+    Map<Integer, DummyStoreOperator> dummyOps = parentOperators.get(0).getTagToOperatorTree();
+    return dummyOps;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Mon Oct  6 04:00:39 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorAppMasterEventOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.AppMasterEventDesc;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
@@ -114,10 +116,16 @@ public final class OperatorFactory {
         RCFileMergeOperator.class));
     opvec.add(new OpTuple<OrcFileMergeDesc>(OrcFileMergeDesc.class,
         OrcFileMergeOperator.class));
+    opvec.add(new OpTuple<CommonMergeJoinDesc>(CommonMergeJoinDesc.class,
+        CommonMergeJoinOperator.class));
   }
 
   static {
     vectorOpvec = new ArrayList<OpTuple>();
+    vectorOpvec.add(new OpTuple<AppMasterEventDesc>(AppMasterEventDesc.class,
+        VectorAppMasterEventOperator.class));
+    vectorOpvec.add(new OpTuple<DynamicPruningEventDesc>(DynamicPruningEventDesc.class,
+        VectorAppMasterEventOperator.class));
     vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
     vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
     vectorOpvec.add(new OpTuple<MapJoinDesc>(MapJoinDesc.class, VectorMapJoinOperator.class));

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorUtils.java Mon Oct  6 04:00:39 2014
@@ -46,6 +46,9 @@ public class OperatorUtils {
   public static <T> Set<T> findOperators(Collection<Operator<?>> starts, Class<T> clazz) {
     Set<T> found = new HashSet<T>();
     for (Operator<?> start : starts) {
+      if (start == null) {
+        continue;
+      }
       findOperators(start, clazz, found);
     }
     return found;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Mon Oct  6 04:00:39 2014
@@ -50,7 +50,6 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -67,6 +66,9 @@ public class ReduceSinkOperator extends 
   }
 
   private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
+  private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final long serialVersionUID = 1L;
   private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
 
@@ -117,6 +119,8 @@ public class ReduceSinkOperator extends 
   protected transient Object[] cachedValues;
   protected transient List<List<Integer>> distinctColIndices;
   protected transient Random random;
+  protected transient int bucketNumber;
+
   /**
    * This two dimensional array holds key data and a corresponding Union object
    * which contains the tag identifying the aggregate expression for distinct columns.
@@ -144,8 +148,14 @@ public class ReduceSinkOperator extends 
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
       List<ExprNodeDesc> keys = conf.getKeyCols();
-      LOG.debug("keys size is " + keys.size());
-      for (ExprNodeDesc k : keys) LOG.debug("Key exprNodeDesc " + k.getExprString());
+
+      if (isDebugEnabled) {
+        LOG.debug("keys size is " + keys.size());
+        for (ExprNodeDesc k : keys) {
+          LOG.debug("Key exprNodeDesc " + k.getExprString());
+        }
+      }
+
       keyEval = new ExprNodeEvaluator[keys.size()];
       int i = 0;
       for (ExprNodeDesc e : keys) {
@@ -184,7 +194,9 @@ public class ReduceSinkOperator extends 
       tag = conf.getTag();
       tagByte[0] = (byte) tag;
       skipTag = conf.getSkipTag();
-      LOG.info("Using tag = " + tag);
+      if (isInfoEnabled) {
+        LOG.info("Using tag = " + tag);
+      }
 
       TableDesc keyTableDesc = conf.getKeySerializeInfo();
       keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
@@ -284,7 +296,10 @@ public class ReduceSinkOperator extends 
           bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
         }
 
-        LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " + conf.getNumDistributionKeys());
+        if (isInfoEnabled) {
+          LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " +
+              conf.getNumDistributionKeys());
+        }
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -304,15 +319,14 @@ public class ReduceSinkOperator extends 
       populateCachedDistributionKeys(row, 0);
 
       // replace bucketing columns with hashcode % numBuckets
-      int buckNum = -1;
       if (bucketEval != null) {
-        buckNum = computeBucketNumber(row, conf.getNumBuckets());
-        cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
+        bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
+        cachedKeys[0][buckColIdxInKey] = new Text(String.valueOf(bucketNumber));
       } else if (conf.getWriteType() == AcidUtils.Operation.UPDATE ||
           conf.getWriteType() == AcidUtils.Operation.DELETE) {
         // In the non-partitioned case we still want to compute the bucket number for updates and
         // deletes.
-        buckNum = computeBucketNumber(row, conf.getNumBuckets());
+        bucketNumber = computeBucketNumber(row, conf.getNumBuckets());
       }
 
       HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
@@ -328,7 +342,7 @@ public class ReduceSinkOperator extends 
       if (autoParallel && partitionEval.length > 0) {
         hashCode = computeMurmurHash(firstKey);
       } else {
-        hashCode = computeHashCode(row, buckNum);
+        hashCode = computeHashCode(row);
       }
 
       firstKey.setHashCode(hashCode);
@@ -377,7 +391,9 @@ public class ReduceSinkOperator extends 
       // column directly.
       Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
       buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
-      LOG.debug("Acid choosing bucket number " + buckNum);
+      if (isTraceEnabled) {
+        LOG.trace("Acid choosing bucket number " + buckNum);
+      }
     } else {
       for (int i = 0; i < bucketEval.length; i++) {
         Object o = bucketEval[i].evaluate(row);
@@ -422,7 +438,7 @@ public class ReduceSinkOperator extends 
     return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
   }
 
-  private int computeHashCode(Object row, int buckNum) throws HiveException {
+  private int computeHashCode(Object row) throws HiveException {
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
@@ -446,8 +462,10 @@ public class ReduceSinkOperator extends 
             + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
       }
     }
-    LOG.debug("Going to return hash code " + (keyHashCode * 31 + buckNum));
-    return buckNum < 0  ? keyHashCode : keyHashCode * 31 + buckNum;
+    if (isTraceEnabled) {
+      LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber));
+    }
+    return bucketNumber < 0  ? keyHashCode : keyHashCode * 31 + bucketNumber;
   }
 
   private boolean partitionKeysAreNull(Object row) throws HiveException {
@@ -493,10 +511,19 @@ public class ReduceSinkOperator extends 
   }
 
   private BytesWritable makeValueWritable(Object row) throws Exception {
+    int length = valueEval.length;
+
+    // in case of bucketed table, insert the bucket number as the last column in value
+    if (bucketEval != null) {
+      length -= 1;
+      cachedValues[length] = new Text(String.valueOf(bucketNumber));
+    }
+
     // Evaluate the value
-    for (int i = 0; i < valueEval.length; i++) {
+    for (int i = 0; i < length; i++) {
       cachedValues[i] = valueEval[i].evaluate(row);
     }
+
     // Serialize the value
     return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Mon Oct  6 04:00:39 2014
@@ -89,6 +89,7 @@ import org.apache.hadoop.hive.ql.plan.Fi
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -201,6 +202,8 @@ public final class Utilities {
   public static String HADOOP_LOCAL_FS = "file:///";
   public static String MAP_PLAN_NAME = "map.xml";
   public static String REDUCE_PLAN_NAME = "reduce.xml";
+  public static String MERGE_PLAN_NAME = "merge.xml";
+  public static final String INPUT_NAME = "iocontext.input.name";
   public static final String MAPRED_MAPPER_CLASS = "mapred.mapper.class";
   public static final String MAPRED_REDUCER_CLASS = "mapred.reducer.class";
 
@@ -291,6 +294,39 @@ public final class Utilities {
     return (ReduceWork) getBaseWork(conf, REDUCE_PLAN_NAME);
   }
 
+  public static Path setMergeWork(JobConf conf, MergeJoinWork mergeJoinWork, Path mrScratchDir,
+      boolean useCache) {
+    for (BaseWork baseWork : mergeJoinWork.getBaseWorkList()) {
+      setBaseWork(conf, baseWork, mrScratchDir, baseWork.getName() + MERGE_PLAN_NAME, useCache);
+      String prefixes = conf.get(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES);
+      if (prefixes == null) {
+        prefixes = baseWork.getName();
+      } else {
+        prefixes = prefixes + "," + baseWork.getName();
+      }
+      conf.set(DagUtils.TEZ_MERGE_WORK_FILE_PREFIXES, prefixes);
+    }
+
+    // nothing to return
+    return null;
+  }
+
+  public static BaseWork getMergeWork(JobConf jconf) {
+    if ((jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX) == null)
+        || (jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX).isEmpty())) {
+      return null;
+    }
+    return getMergeWork(jconf, jconf.get(DagUtils.TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX));
+  }
+
+  public static BaseWork getMergeWork(JobConf jconf, String prefix) {
+    if (prefix == null || prefix.isEmpty()) {
+      return null;
+    }
+
+    return getBaseWork(jconf, prefix + MERGE_PLAN_NAME);
+  }
+
   public static void cacheBaseWork(Configuration conf, String name, BaseWork work,
       Path hiveScratchDir) {
     try {
@@ -375,6 +411,8 @@ public final class Utilities {
             throw new RuntimeException("unable to determine work from configuration ."
                 + MAPRED_REDUCER_CLASS +" was "+ conf.get(MAPRED_REDUCER_CLASS)) ;
           }
+        } else if (name.contains(MERGE_PLAN_NAME)) {
+          gWork = deserializePlan(in, MapWork.class, conf);
         }
         gWorkMap.put(path, gWork);
       } else {
@@ -608,8 +646,14 @@ public final class Utilities {
   }
 
   public static void setMapRedWork(Configuration conf, MapredWork w, Path hiveScratchDir) {
+    String useName = conf.get(INPUT_NAME);
+    if (useName == null) {
+      useName = "mapreduce";
+    }
+    conf.set(INPUT_NAME, useName);
     setMapWork(conf, w.getMapWork(), hiveScratchDir, true);
     if (w.getReduceWork() != null) {
+      conf.set(INPUT_NAME, useName);
       setReduceWork(conf, w.getReduceWork(), hiveScratchDir, true);
     }
   }
@@ -1846,7 +1890,7 @@ public final class Utilities {
 
       for (int i = 0; i < parts.length; ++i) {
         assert parts[i].isDir() : "dynamic partition " + parts[i].getPath()
-            + " is not a direcgtory";
+            + " is not a directory";
         FileStatus[] items = fs.listStatus(parts[i].getPath());
 
         // remove empty directory since DP insert should not generate empty partitions.

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Mon Oct  6 04:00:39 2014
@@ -78,10 +78,11 @@ public class ExecMapper extends MapReduc
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
-  private final ExecMapperContext execContext = new ExecMapperContext();
+  private ExecMapperContext execContext = null;
 
   @Override
   public void configure(JobConf job) {
+    execContext = new ExecMapperContext(job);
     // Allocate the bean at the beginning -
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
@@ -292,6 +293,7 @@ public class ExecMapper extends MapReduc
       this.rp = rp;
     }
 
+    @Override
     public void func(Operator op) {
       Map<Enum<?>, Long> opStats = op.getStats();
       for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapperContext.java Mon Oct  6 04:00:39 2014
@@ -22,6 +22,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.mapred.JobConf;
@@ -60,8 +61,9 @@ public class ExecMapperContext {
     this.currentBigBucketFile = currentBigBucketFile;
   }
 
-  public ExecMapperContext() {
-    ioCxt = IOContext.get();
+  public ExecMapperContext(JobConf jc) {
+    this.jc = jc;
+    ioCxt = IOContext.get(jc.get(Utilities.INPUT_NAME));
   }
 
   public void clear() {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Mon Oct  6 04:00:39 2014
@@ -66,6 +66,8 @@ import org.apache.hadoop.util.StringUtil
 public class ExecReducer extends MapReduceBase implements Reducer {
 
   private static final Log LOG = LogFactory.getLog("ExecReducer");
+  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
+  private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
   // used to log memory usage periodically
@@ -75,7 +77,6 @@ public class ExecReducer extends MapRedu
   private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
   private final Object[] valueObject = new Object[Byte.MAX_VALUE];
   private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
-  private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
 
   // TODO: move to DynamicSerDe when it's ready
   private Deserializer inputKeyDeserializer;
@@ -101,16 +102,18 @@ public class ExecReducer extends MapRedu
     ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
 
-    LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+    if (isInfoEnabled) {
+      LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
-    try {
-      LOG.info("conf classpath = "
-          + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
-      LOG.info("thread classpath = "
-          + Arrays.asList(((URLClassLoader) Thread.currentThread()
-          .getContextClassLoader()).getURLs()));
-    } catch (Exception e) {
-      LOG.info("cannot get classpath: " + e.getMessage());
+      try {
+        LOG.info("conf classpath = "
+            + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+        LOG.info("thread classpath = "
+            + Arrays.asList(((URLClassLoader) Thread.currentThread()
+            .getContextClassLoader()).getURLs()));
+      } catch (Exception e) {
+        LOG.info("cannot get classpath: " + e.getMessage());
+      }
     }
     jc = job;
 
@@ -147,7 +150,6 @@ public class ExecReducer extends MapRedu
         ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
         ois.add(keyObjectInspector);
         ois.add(valueObjectInspector[tag]);
-        reducer.setGroupKeyObjectInspector(keyObjectInspector);
         rowObjectInspector[tag] = ObjectInspectorFactory
             .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
       }
@@ -202,7 +204,9 @@ public class ExecReducer extends MapRedu
           groupKey = new BytesWritable();
         } else {
           // If a operator wants to do some work at the end of a group
-          LOG.trace("End Group");
+          if (isTraceEnabled) {
+            LOG.trace("End Group");
+          }
           reducer.endGroup();
         }
 
@@ -217,9 +221,11 @@ public class ExecReducer extends MapRedu
         }
 
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
-        LOG.trace("Start Group");
-        reducer.setGroupKeyObject(keyObject);
+        if (isTraceEnabled) {
+          LOG.trace("Start Group");
+        }
         reducer.startGroup();
+        reducer.setGroupKeyObject(keyObject);
       }
       // System.err.print(keyObject.toString());
       while (values.hasNext()) {
@@ -239,12 +245,14 @@ public class ExecReducer extends MapRedu
         row.clear();
         row.add(keyObject);
         row.add(valueObject[tag]);
-        if (isLogInfoEnabled) {
+        if (isInfoEnabled) {
           cntr++;
           if (cntr == nextCntr) {
             long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            LOG.info("ExecReducer: processing " + cntr
-                + " rows: used memory = " + used_memory);
+            if (isInfoEnabled) {
+              LOG.info("ExecReducer: processing " + cntr
+                  + " rows: used memory = " + used_memory);
+            }
             nextCntr = getNextCntr(cntr);
           }
         }
@@ -290,17 +298,19 @@ public class ExecReducer extends MapRedu
   public void close() {
 
     // No row was processed
-    if (oc == null) {
+    if (oc == null && isTraceEnabled) {
       LOG.trace("Close called without any rows processed");
     }
 
     try {
       if (groupKey != null) {
         // If a operator wants to do some work at the end of a group
-        LOG.trace("End Group");
+        if (isTraceEnabled) {
+          LOG.trace("End Group");
+        }
         reducer.endGroup();
       }
-      if (isLogInfoEnabled) {
+      if (isInfoEnabled) {
         LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
             + memoryMXBean.getHeapMemoryUsage().getUsed());
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Mon Oct  6 04:00:39 2014
@@ -91,7 +91,7 @@ public class MapredLocalTask extends Tas
 
   // not sure we need this exec context; but all the operators in the work
   // will pass this context throught
-  private ExecMapperContext execContext = new ExecMapperContext();
+  private ExecMapperContext execContext = null;
 
   private Process executor;
 
@@ -113,6 +113,7 @@ public class MapredLocalTask extends Tas
   public void initialize(HiveConf conf, QueryPlan queryPlan, DriverContext driverContext) {
     super.initialize(conf, queryPlan, driverContext);
     job = new JobConf(conf, ExecDriver.class);
+    execContext = new ExecMapperContext(job);
     //we don't use the HadoopJobExecHooks for local tasks
     this.jobExecHelper = new HadoopJobExecHelper(job, console, this, null);
   }
@@ -301,6 +302,11 @@ public class MapredLocalTask extends Tas
     if (work == null) {
       return -1;
     }
+
+    if (execContext == null) {
+      execContext = new ExecMapperContext(job);
+    }
+
     memoryMXBean = ManagementFactory.getMemoryMXBean();
     long startTime = System.currentTimeMillis();
     console.printInfo(Utilities.now()

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Mon Oct  6 04:00:39 2014
@@ -31,6 +31,7 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
@@ -79,9 +80,14 @@ public class CustomPartitionVertex exten
   private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
-  private boolean rootVertexInitialized = false;
   private final SplitGrouper grouper = new SplitGrouper();
   private int taskCount = 0;
+  private VertexType vertexType;
+  private String mainWorkName;
+  private final Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
+
+  private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
+      new HashMap<String, Multimap<Integer, InputSplit>>();
 
   public CustomPartitionVertex(VertexManagerPluginContext context) {
     super(context);
@@ -90,8 +96,18 @@ public class CustomPartitionVertex exten
   @Override
   public void initialize() {
     this.context = getContext();
-    ByteBuffer byteBuf = context.getUserPayload().getPayload();
-    this.numBuckets = byteBuf.getInt();
+    ByteBuffer payload = context.getUserPayload().getPayload();
+    CustomVertexConfiguration vertexConf = new CustomVertexConfiguration();
+    DataInputByteBuffer dibb = new DataInputByteBuffer();
+    dibb.reset(payload);
+    try {
+      vertexConf.readFields(dibb);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+    this.numBuckets = vertexConf.getNumBuckets();
+    this.mainWorkName = vertexConf.getInputName();
+    this.vertexType = vertexConf.getVertexType();
   }
 
   @Override
@@ -113,17 +129,12 @@ public class CustomPartitionVertex exten
   public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
   }
 
-  // One call per root Input - and for now only one is handled.
+  // One call per root Input
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
       List<Event> events) {
+    LOG.info("On root vertex initialized " + inputName);
 
-    // Ideally, since there's only 1 Input expected at the moment -
-    // ensure this method is called only once. Tez will call it once per Root
-    // Input.
-    Preconditions.checkState(rootVertexInitialized == false);
-    LOG.info("Root vertex not initialized");
-    rootVertexInitialized = true;
     try {
       // This is using the payload from the RootVertexInitializer corresponding
       // to InputName. Ideally it should be using it's own configuration class -
@@ -164,9 +175,6 @@ public class CustomPartitionVertex exten
         // No tasks should have been started yet. Checked by initial state
         // check.
         Preconditions.checkState(dataInformationEventSeen == false);
-        Preconditions
-            .checkState(context.getVertexNumTasks(context.getVertexName()) == -1,
-                "Parallelism for the vertex should be set to -1 if the InputInitializer is setting parallelism");
         InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
@@ -220,21 +228,55 @@ public class CustomPartitionVertex exten
             (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
         Multimap<Integer, InputSplit> groupedSplit =
             HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-            availableSlots);
+                availableSlots, inputName);
         bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
       }
 
-      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap.size() + " tasks");
-      processAllEvents(inputName, bucketToGroupedSplitMap);
+      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
+      if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+        /*
+         * this is the small table side. In case of SMB join, we may need to send each split to the
+         * corresponding bucket-based task on the other side. In case a split needs to go to
+         * multiple downstream tasks, we need to clone the event and send it to the right
+         * destination.
+         */
+        processAllSideEvents(inputName, bucketToGroupedSplitMap);
+      } else {
+        processAllEvents(inputName, bucketToGroupedSplitMap);
+      }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
+  private void processAllSideEvents(String inputName,
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+    // the bucket to task map should have been setup by the big table.
+    if (bucketToTaskMap.isEmpty()) {
+      inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
+      return;
+    }
+    List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+    for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+      Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
+      for (Integer task : destTasks) {
+        for (InputSplit split : entry.getValue()) {
+          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+          InputDataInformationEvent diEvent =
+              InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
+                  .toByteString().asReadOnlyByteBuffer());
+          diEvent.setTargetIndex(task);
+          taskEvents.add(diEvent);
+        }
+      }
+    }
+
+    context.addRootInputEvents(inputName, taskEvents);
+  }
+
   private void processAllEvents(String inputName,
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
 
-    Multimap<Integer, Integer> bucketToTaskMap = HashMultimap.<Integer, Integer> create();
     List<InputSplit> finalSplits = Lists.newLinkedList();
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
@@ -248,11 +290,13 @@ public class CustomPartitionVertex exten
 
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
-    EdgeManagerPluginDescriptor hiveEdgeManagerDesc =
-        EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
-    UserPayload payload = getBytePayload(bucketToTaskMap);
-    hiveEdgeManagerDesc.setUserPayload(payload);
-
+    EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
+    if ((vertexType == VertexType.MULTI_INPUT_INITIALIZED_EDGES)
+        || (vertexType == VertexType.INITIALIZED_EDGES)) {
+      hiveEdgeManagerDesc = EdgeManagerPluginDescriptor.create(CustomPartitionEdge.class.getName());
+      UserPayload payload = getBytePayload(bucketToTaskMap);
+      hiveEdgeManagerDesc.setUserPayload(payload);
+    }
     Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
@@ -285,13 +329,21 @@ public class CustomPartitionVertex exten
     rootInputSpecUpdate.put(
         inputName,
         InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
-    context.setVertexParallelism(
-        taskCount,
-        VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
-            .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+    if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
+      context.setVertexParallelism(
+          taskCount,
+          VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
+              .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
+    }
 
     // Set the actual events for the tasks.
     context.addRootInputEvents(inputName, taskEvents);
+    if (inputToGroupedSplitMap.isEmpty() == false) {
+      for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
+        processAllSideEvents(entry.getKey(), entry.getValue());
+      }
+      inputToGroupedSplitMap.clear();
+    }
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -315,7 +367,8 @@ public class CustomPartitionVertex exten
 
     if (!(inputSplit instanceof FileSplit)) {
       throw new UnsupportedOperationException(
-          "Cannot handle splits other than FileSplit for the moment");
+          "Cannot handle splits other than FileSplit for the moment. Current input split type: "
+              + inputSplit.getClass().getSimpleName());
     }
     return (FileSplit) inputSplit;
   }
@@ -327,7 +380,6 @@ public class CustomPartitionVertex exten
       Map<String, List<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
-    int fsCount = 0;
 
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
@@ -335,14 +387,20 @@ public class CustomPartitionVertex exten
     for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
-        fsCount++;
         bucketToInitialSplitMap.put(bucketId, fsplit);
       }
       bucketNum++;
     }
 
-    LOG.info("Total number of splits counted: " + fsCount + " and total files encountered: "
-        + pathFileSplitsMap.size());
+    if (bucketNum < numBuckets) {
+      int loopedBucketId = 0;
+      for (; bucketNum < numBuckets; bucketNum++) {
+        for (InputSplit fsplit : bucketToInitialSplitMap.get(loopedBucketId)) {
+          bucketToInitialSplitMap.put(bucketNum, fsplit);
+        }
+        loopedBucketId++;
+      }
+    }
 
     return bucketToInitialSplitMap;
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Oct  6 04:00:39 2014
@@ -20,6 +20,23 @@ package org.apache.hadoop.hive.ql.exec.t
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
+import com.google.protobuf.ByteString;
+
+import javax.security.auth.login.LoginException;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
@@ -32,6 +49,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -47,10 +65,12 @@ import org.apache.hadoop.hive.ql.io.merg
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MergeJoinWork;
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
@@ -90,12 +110,16 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
+import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
+import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -104,21 +128,6 @@ import org.apache.tez.runtime.library.co
 import org.apache.tez.runtime.library.conf.UnorderedPartitionedKVEdgeConfig;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
 
-import javax.security.auth.login.LoginException;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-
 /**
  * DagUtils. DagUtils is a collection of helper methods to convert
  * map and reduce work to tez vertices and edges. It handles configuration
@@ -130,6 +139,11 @@ public class DagUtils {
   private static final Log LOG = LogFactory.getLog(DagUtils.class.getName());
   private static final String TEZ_DIR = "_tez_scratch_dir";
   private static DagUtils instance;
+  // The merge file being currently processed.
+  public static final String TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX =
+      "hive.tez.current.merge.file.prefix";
+  // "A comma separated list of work names used as prefix.
+  public static final String TEZ_MERGE_WORK_FILE_PREFIXES = "hive.tez.merge.file.prefixes";
 
   private void addCredentials(MapWork mapWork, DAG dag) {
     Set<String> paths = mapWork.getPathToAliases().keySet();
@@ -238,8 +252,8 @@ public class DagUtils {
    * endpoints.
    */
   @SuppressWarnings("rawtypes")
-  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf,
-      Vertex w, TezEdgeProperty edgeProp)
+  public GroupInputEdge createEdge(VertexGroup group, JobConf vConf, Vertex w,
+      TezEdgeProperty edgeProp, VertexType vertexType)
     throws IOException {
 
     Class mergeInputClass;
@@ -254,10 +268,14 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
           VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
-      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
-      userPayload.flip();
+      byte[] userPayloadBytes = dob.getData();
+      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -289,17 +307,21 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public Edge createEdge(JobConf vConf, Vertex v, Vertex w,
-      TezEdgeProperty edgeProp)
+  public Edge createEdge(JobConf vConf, Vertex v, Vertex w, TezEdgeProperty edgeProp,
+      VertexType vertexType)
     throws IOException {
 
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      ByteBuffer userPayload = ByteBuffer.allocate(4).putInt(numBuckets);
-      userPayload.flip();
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
           CustomPartitionVertex.class.getName());
+      byte[] userPayloadBytes = dob.getData();
+      ByteBuffer userPayload = ByteBuffer.wrap(userPayloadBytes);
       desc.setUserPayload(UserPayload.create(userPayload));
       w.setVertexManagerPlugin(desc);
       break;
@@ -405,7 +427,7 @@ public class DagUtils {
    * from yarn. Falls back to Map-reduce's map size if tez
    * container size isn't set.
    */
-  private Resource getContainerResource(Configuration conf) {
+  public static Resource getContainerResource(Configuration conf) {
     int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
       HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
       conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
@@ -443,12 +465,61 @@ public class DagUtils {
     return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
+  private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
+      List<LocalResource> additionalLr, FileSystem fs, Path mrScratchDir, Context ctx,
+      VertexType vertexType)
+      throws Exception {
+    Utilities.setMergeWork(conf, mergeJoinWork, mrScratchDir, false);
+    if (mergeJoinWork.getMainWork() instanceof MapWork) {
+      List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
+      MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
+      CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
+      Vertex mergeVx =
+          createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
+
+      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
+      // here would cause pre-mature grouping which would be incorrect.
+      Class inputFormatClass = HiveInputFormat.class;
+      conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
+      // mapreduce.tez.input.initializer.serialize.event.payload should be set
+      // to false when using this plug-in to avoid getting a serialized event at run-time.
+      conf.setBoolean("mapreduce.tez.input.initializer.serialize.event.payload", false);
+      for (int i = 0; i < mapWorkList.size(); i++) {
+
+        mapWork = (MapWork) (mapWorkList.get(i));
+        conf.set(TEZ_MERGE_CURRENT_MERGE_FILE_PREFIX, mapWork.getName());
+        conf.set(Utilities.INPUT_NAME, mapWork.getName());
+        LOG.info("Going through each work and adding MultiMRInput");
+        mergeVx.addDataSource(mapWork.getName(),
+            MultiMRInput.createConfigBuilder(conf, HiveInputFormat.class).build());
+      }
+
+      VertexManagerPluginDescriptor desc =
+        VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      CustomVertexConfiguration vertexConf =
+          new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
+              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+      DataOutputBuffer dob = new DataOutputBuffer();
+      vertexConf.write(dob);
+      byte[] userPayload = dob.getData();
+      desc.setUserPayload(UserPayload.create(ByteBuffer.wrap(userPayload)));
+      mergeVx.setVertexManagerPlugin(desc);
+      return mergeVx;
+    } else {
+      Vertex mergeVx =
+          createVertex(conf, (ReduceWork) mergeJoinWork.getMainWork(), appJarLr, additionalLr, fs,
+              mrScratchDir, ctx);
+      return mergeVx;
+    }
+  }
+
   /*
    * Helper function to create Vertex from MapWork.
    */
   private Vertex createVertex(JobConf conf, MapWork mapWork,
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
-      Path mrScratchDir, Context ctx, TezWork tezWork) throws Exception {
+      Path mrScratchDir, Context ctx, VertexType vertexType)
+      throws Exception {
 
     Path tezDir = getTezDir(mrScratchDir);
 
@@ -470,15 +541,8 @@ public class DagUtils {
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
-    boolean vertexHasCustomInput = false;
-    if (tezWork != null) {
-      for (BaseWork baseWork : tezWork.getParents(mapWork)) {
-        if (tezWork.getEdgeType(baseWork, mapWork) == EdgeType.CUSTOM_EDGE) {
-          vertexHasCustomInput = true;
-        }
-      }
-    }
-
+    boolean vertexHasCustomInput = VertexType.isCustomInputType(vertexType);
+    LOG.info("Vertex has custom input? " + vertexHasCustomInput);
     if (vertexHasCustomInput) {
       groupSplitsInInputInitializer = false;
       // grouping happens in execution phase. The input payload should not enable grouping here,
@@ -513,6 +577,8 @@ public class DagUtils {
       }
     }
 
+    // remember mapping of plan to input
+    conf.set(Utilities.INPUT_NAME, mapWork.getName());
     if (HiveConf.getBoolVar(conf, ConfVars.HIVE_AM_SPLIT_GENERATION)
         && !mapWork.isUseOneNullRowInputFormat()) {
 
@@ -593,6 +659,7 @@ public class DagUtils {
       Path mrScratchDir, Context ctx) throws Exception {
 
     // set up operator plan
+    conf.set(Utilities.INPUT_NAME, reduceWork.getName());
     Utilities.setReduceWork(conf, reduceWork, mrScratchDir, false);
 
     // create the directories FileSinkOperators need
@@ -937,12 +1004,22 @@ public class DagUtils {
       return initializeVertexConf(conf, context, (MapWork)work);
     } else if (work instanceof ReduceWork) {
       return initializeVertexConf(conf, context, (ReduceWork)work);
+    } else if (work instanceof MergeJoinWork) {
+      return initializeVertexConf(conf, context, (MergeJoinWork) work);
     } else {
       assert false;
       return null;
     }
   }
 
+  private JobConf initializeVertexConf(JobConf conf, Context context, MergeJoinWork work) {
+    if (work.getMainWork() instanceof MapWork) {
+      return initializeVertexConf(conf, context, (MapWork) (work.getMainWork()));
+    } else {
+      return initializeVertexConf(conf, context, (ReduceWork) (work.getMainWork()));
+    }
+  }
+
   /**
    * Create a vertex from a given work object.
    *
@@ -958,18 +1035,21 @@ public class DagUtils {
    */
   public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr,
-      List<LocalResource> additionalLr,
-      FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
+      List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,
+      TezWork tezWork, VertexType vertexType) throws Exception {
 
     Vertex v = null;
     // simply dispatch the call to the right method for the actual (sub-) type of
     // BaseWork.
     if (work instanceof MapWork) {
-      v = createVertex(conf, (MapWork) work, appJarLr,
-          additionalLr, fileSystem, scratchDir, ctx, tezWork);
+      v = createVertex(conf, (MapWork) work, appJarLr, additionalLr, fileSystem, scratchDir, ctx,
+              vertexType);
     } else if (work instanceof ReduceWork) {
       v = createVertex(conf, (ReduceWork) work, appJarLr,
           additionalLr, fileSystem, scratchDir, ctx);
+    } else if (work instanceof MergeJoinWork) {
+      v = createVertex(conf, (MergeJoinWork) work, appJarLr, additionalLr, fileSystem, scratchDir,
+              ctx, vertexType);
     } else {
       // something is seriously wrong if this is happening
       throw new HiveException(ErrorMsg.GENERIC_ERROR.getErrorCodedMsg());

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1629563&r1=1629562&r2=1629563&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Mon Oct  6 04:00:39 2014
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -59,6 +60,7 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.runtime.api.InputInitializerContext;
 import org.apache.tez.runtime.api.events.InputInitializerEvent;
 
@@ -77,12 +79,13 @@ public class DynamicPartitionPruner {
 
   private final BytesWritable writable = new BytesWritable();
 
-  private final BlockingQueue<InputInitializerEvent> queue =
-      new LinkedBlockingQueue<InputInitializerEvent>();
+  private final BlockingQueue<Object> queue = new LinkedBlockingQueue<Object>();
+
+  private final Set<String> sourcesWaitingForEvents = new HashSet<String>();
 
   private int sourceInfoCount = 0;
 
-  private InputInitializerContext context;
+  private final Object endOfEvents = new Object();
 
   public DynamicPartitionPruner() {
   }
@@ -91,8 +94,21 @@ public class DynamicPartitionPruner {
       throws SerDeException, IOException,
       InterruptedException, HiveException {
 
-    this.context = context;
-    this.initialize(work, jobConf);
+    synchronized(sourcesWaitingForEvents) {
+      initialize(work, jobConf);
+
+      if (sourcesWaitingForEvents.isEmpty()) {
+        return;
+      }
+
+      Set<VertexState> states = Collections.singleton(VertexState.SUCCEEDED);
+      for (String source : sourcesWaitingForEvents) {
+        // we need to get state transition updates for the vertices that will send
+        // events to us. once we have received all events and a vertex has succeeded,
+        // we can move to do the pruning.
+        context.registerForVertexStateUpdates(source, states);
+      }
+    }
 
     LOG.info("Waiting for events (" + sourceInfoCount + " items) ...");
     // synchronous event processing loop. Won't return until all events have
@@ -102,7 +118,7 @@ public class DynamicPartitionPruner {
     LOG.info("Ok to proceed.");
   }
 
-  public BlockingQueue<InputInitializerEvent> getQueue() {
+  public BlockingQueue<Object> getQueue() {
     return queue;
   }
 
@@ -111,11 +127,14 @@ public class DynamicPartitionPruner {
     sourceInfoCount = 0;
   }
 
-  private void initialize(MapWork work, JobConf jobConf) throws SerDeException {
+  public void initialize(MapWork work, JobConf jobConf) throws SerDeException {
     this.clear();
     Map<String, SourceInfo> columnMap = new HashMap<String, SourceInfo>();
+    Set<String> sources = work.getEventSourceTableDescMap().keySet();
+
+    sourcesWaitingForEvents.addAll(sources);
 
-    for (String s : work.getEventSourceTableDescMap().keySet()) {
+    for (String s : sources) {
       List<TableDesc> tables = work.getEventSourceTableDescMap().get(s);
       List<String> columnNames = work.getEventSourceColumnNameMap().get(s);
       List<ExprNodeDesc> partKeyExprs = work.getEventSourcePartKeyExprMap().get(s);
@@ -277,46 +296,30 @@ public class DynamicPartitionPruner {
 
   private void processEvents() throws SerDeException, IOException, InterruptedException {
     int eventCount = 0;
-    int neededEvents = getExpectedNumberOfEvents();
 
-    while (neededEvents > eventCount) {
-      InputInitializerEvent event = queue.take();
+    while (true) {
+      Object element = queue.take();
+
+      if (element == endOfEvents) {
+        // we're done processing events
+        break;
+      }
+
+      InputInitializerEvent event = (InputInitializerEvent) element;
+
       LOG.info("Input event: " + event.getTargetInputName() + ", " + event.getTargetVertexName()
           + ", " + (event.getUserPayload().limit() - event.getUserPayload().position()));
-      processPayload(event.getUserPayload());
+      processPayload(event.getUserPayload(), event.getSourceVertexName());
       eventCount += 1;
-      neededEvents = getExpectedNumberOfEvents();
-      LOG.info("Needed events: " + neededEvents + ", received events: " + eventCount);
     }
-  }
-
-  private int getExpectedNumberOfEvents() throws InterruptedException {
-    int neededEvents = 0;
-
-    boolean notInitialized;
-    do {
-      neededEvents = 0;
-      notInitialized = false;
-      for (String s : sourceInfoMap.keySet()) {
-        int multiplier = sourceInfoMap.get(s).size();
-        int taskNum = context.getVertexNumTasks(s);
-        LOG.info("Vertex " + s + " has " + taskNum + " events.");
-        if (taskNum < 0) {
-          notInitialized = true;
-          Thread.sleep(10);
-          continue;
-        }
-        neededEvents += (taskNum * multiplier);
-      }
-    } while (notInitialized);
-
-    return neededEvents;
+    LOG.info("Received events: " + eventCount);
   }
 
   @SuppressWarnings("deprecation")
-  private String processPayload(ByteBuffer payload) throws SerDeException, IOException {
+  private String processPayload(ByteBuffer payload, String sourceName) throws SerDeException,
+      IOException {
+
     DataInputStream in = new DataInputStream(new ByteBufferBackedInputStream(payload));
-    String sourceName = in.readUTF();
     String columnName = in.readUTF();
     boolean skip = in.readBoolean();
 
@@ -390,4 +393,26 @@ public class DynamicPartitionPruner {
     }
   }
 
+  public void addEvent(InputInitializerEvent event) {
+    synchronized(sourcesWaitingForEvents) {
+      if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
+          queue.offer(event);
+      }
+    }
+  }
+
+  public void processVertex(String name) {
+    LOG.info("Vertex succeeded: " + name);
+
+    synchronized(sourcesWaitingForEvents) {
+      sourcesWaitingForEvents.remove(name);
+
+      if (sourcesWaitingForEvents.isEmpty()) {
+        // we've got what we need; mark the queue
+        queue.offer(endOfEvents);
+      } else {
+        LOG.info("Waiting for " + sourcesWaitingForEvents.size() + " events.");
+      }
+    }
+  }
 }