You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/30 17:22:48 UTC

svn commit: r1635536 [8/28] - in /hive/branches/spark: ./ accumulo-handler/ accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoop/hive/accumulo/columns/ accumulo-handler/src/test/org/apache/hadoo...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Thu Oct 30 16:22:33 2014
@@ -25,8 +25,6 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
@@ -51,10 +49,13 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.util.hash.MurmurHash;
 
+import static org.apache.hadoop.hive.ql.plan.ReduceSinkDesc.ReducerTraits.UNIFORM;
+
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
@@ -65,10 +66,13 @@ public class ReduceSinkOperator extends 
     PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
   }
 
-  private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
-  private static final boolean isInfoEnabled = LOG.isInfoEnabled();
-  private static final boolean isDebugEnabled = LOG.isDebugEnabled();
-  private static final boolean isTraceEnabled = LOG.isTraceEnabled();
+  /**
+   * Counters.
+   */
+  public static enum Counter {
+    RECORDS_OUT_INTERMEDIATE
+  }
+
   private static final long serialVersionUID = 1L;
   private static final MurmurHash hash = (MurmurHash) MurmurHash.getInstance();
 
@@ -110,7 +114,7 @@ public class ReduceSinkOperator extends 
   protected transient int numDistributionKeys;
   protected transient int numDistinctExprs;
   protected transient String[] inputAliases;  // input aliases of this RS for join (used for PPD)
-  protected transient boolean autoParallel = false;
+  protected transient boolean useUniformHash = false;
   // picks topN K:V pairs from input.
   protected transient TopNHash reducerHash = new TopNHash();
   protected transient HiveKey keyWritable = new HiveKey();
@@ -144,12 +148,25 @@ public class ReduceSinkOperator extends 
   private StructObjectInspector recIdInspector; // OI for the record identifier
   private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
 
+  protected transient long numRows = 0;
+  protected transient long cntr = 1;
+  private final transient LongWritable recordCounter = new LongWritable();
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
     try {
+
+      numRows = 0;
+
+      String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+      if (context != null && !context.isEmpty()) {
+        context = "_" + context.replace(" ","_");
+      }
+      statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
       List<ExprNodeDesc> keys = conf.getKeyCols();
 
-      if (isDebugEnabled) {
+      if (isLogDebugEnabled) {
         LOG.debug("keys size is " + keys.size());
         for (ExprNodeDesc k : keys) {
           LOG.debug("Key exprNodeDesc " + k.getExprString());
@@ -194,7 +211,7 @@ public class ReduceSinkOperator extends 
       tag = conf.getTag();
       tagByte[0] = (byte) tag;
       skipTag = conf.getSkipTag();
-      if (isInfoEnabled) {
+      if (isLogInfoEnabled) {
         LOG.info("Using tag = " + tag);
       }
 
@@ -217,7 +234,7 @@ public class ReduceSinkOperator extends 
         reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
       }
 
-      autoParallel = conf.isAutoParallel();
+      useUniformHash = conf.getReducerTraits().contains(UNIFORM);
 
       firstRow = true;
       initializeChildren(hconf);
@@ -296,7 +313,7 @@ public class ReduceSinkOperator extends 
           bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
         }
 
-        if (isInfoEnabled) {
+        if (isLogInfoEnabled) {
           LOG.info("keys are " + conf.getOutputKeyColumnNames() + " num distributions: " +
               conf.getNumDistributionKeys());
         }
@@ -339,10 +356,10 @@ public class ReduceSinkOperator extends 
       final int hashCode;
 
       // distKeyLength doesn't include tag, but includes buckNum in cachedKeys[0]
-      if (autoParallel && partitionEval.length > 0) {
+      if (useUniformHash && partitionEval.length > 0) {
         hashCode = computeMurmurHash(firstKey);
       } else {
-        hashCode = computeHashCode(row);
+        hashCode = computeHashCode(row, bucketNumber);
       }
 
       firstKey.setHashCode(hashCode);
@@ -391,7 +408,7 @@ public class ReduceSinkOperator extends 
       // column directly.
       Object recIdValue = acidRowInspector.getStructFieldData(row, recIdField);
       buckNum = bucketInspector.get(recIdInspector.getStructFieldData(recIdValue, bucketField));
-      if (isTraceEnabled) {
+      if (isLogTraceEnabled) {
         LOG.trace("Acid choosing bucket number " + buckNum);
       }
     } else {
@@ -438,7 +455,7 @@ public class ReduceSinkOperator extends 
     return hash.hash(firstKey.getBytes(), firstKey.getDistKeyLength(), 0);
   }
 
-  private int computeHashCode(Object row) throws HiveException {
+  private int computeHashCode(Object row, int buckNum) throws HiveException {
     // Evaluate the HashCode
     int keyHashCode = 0;
     if (partitionEval.length == 0) {
@@ -462,10 +479,11 @@ public class ReduceSinkOperator extends 
             + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
       }
     }
-    if (isTraceEnabled) {
-      LOG.trace("Going to return hash code " + (keyHashCode * 31 + bucketNumber));
+    int hashCode = buckNum < 0 ? keyHashCode : keyHashCode * 31 + buckNum;
+    if (isLogTraceEnabled) {
+      LOG.trace("Going to return hash code " + hashCode);
     }
-    return bucketNumber < 0  ? keyHashCode : keyHashCode * 31 + bucketNumber;
+    return hashCode;
   }
 
   private boolean partitionKeysAreNull(Object row) throws HiveException {
@@ -506,6 +524,13 @@ public class ReduceSinkOperator extends 
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
     if (null != out) {
+      numRows++;
+      if (isLogInfoEnabled) {
+        if (numRows == cntr) {
+          cntr *= 10;
+          LOG.info(toString() + ": records written - " + numRows);
+        }
+      }
       out.collect(keyWritable, valueWritable);
     }
   }
@@ -535,6 +560,10 @@ public class ReduceSinkOperator extends 
     }
     super.closeOp(abort);
     out = null;
+    if (isLogInfoEnabled) {
+      LOG.info(toString() + ": records written - " + numRows);
+    }
+    recordCounter.set(numRows);
   }
 
   /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/RowSchema.java Thu Oct 30 16:22:33 2014
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Iterator;
 
 /**
  * RowSchema Implementation.
@@ -49,6 +50,51 @@ public class RowSchema implements Serial
   }
 
   @Override
+  public boolean equals(Object obj) {
+    if (!(obj instanceof RowSchema) || (obj == null)) {
+      return false;
+    }
+    if(this == obj) {
+      return true;
+    }
+
+    RowSchema dest = (RowSchema)obj;
+    if(this.signature == null && dest.getSignature() == null) {
+      return true;
+    }
+    if((this.signature == null && dest.getSignature() != null) ||
+        (this.signature != null && dest.getSignature() == null) ) {
+      return false;
+    }
+
+    if(this.signature.size() != dest.getSignature().size()) {
+      return false;
+    }
+
+    Iterator<ColumnInfo> origIt = this.signature.iterator();
+    Iterator<ColumnInfo> destIt = dest.getSignature().iterator();
+    while(origIt.hasNext()) {
+      ColumnInfo origColumn = origIt.next();
+      ColumnInfo destColumn = destIt.next();
+
+      if(origColumn == null && destColumn == null) {
+        continue;
+      }
+
+      if((origColumn == null && destColumn != null) ||
+          (origColumn != null && destColumn == null) ) {
+        return false;
+      }
+
+      if(!origColumn.equals(destColumn)) {
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+  @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
     sb.append('(');

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Thu Oct 30 16:22:33 2014
@@ -27,8 +27,10 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
@@ -83,6 +85,8 @@ public class ScriptOperator extends Oper
   transient Deserializer scriptOutputDeserializer;
   transient volatile Throwable scriptError = null;
   transient RecordWriter scriptOutWriter = null;
+  // List of conf entries not to turn into env vars
+  transient Set<String> blackListedConfEntries = null;
 
   static final String IO_EXCEPTION_BROKEN_PIPE_STRING = "Broken pipe";
   static final String IO_EXCEPTION_STREAM_CLOSED = "Stream closed";
@@ -120,7 +124,8 @@ public class ScriptOperator extends Oper
 
   /**
    * Most UNIX implementations impose some limit on the total size of environment variables and
-   * size of strings. To fit in this limit we need sometimes to truncate strings.
+   * size of strings. To fit in this limit we need sometimes to truncate strings.  Also,
+   * some values tend be long and are meaningless to scripts, so strain them out.
    * @param value environment variable value to check
    * @param name name of variable (used only for logging purposes)
    * @param truncate truncate value or not
@@ -139,6 +144,23 @@ public class ScriptOperator extends Oper
     return value;
   }
 
+  boolean blackListed(String name) {
+    if (blackListedConfEntries == null) {
+      blackListedConfEntries = new HashSet<String>();
+      if (hconf != null) {
+        String bl = hconf.get(HiveConf.ConfVars.HIVESCRIPT_ENV_BLACKLIST.toString());
+        if (bl != null && bl.length() > 0) {
+          String[] bls = bl.split(",");
+          for (String b : bls) {
+            b.replaceAll(".", "_");
+            blackListedConfEntries.add(b);
+          }
+        }
+      }
+    }
+    return blackListedConfEntries.contains(name);
+  }
+
   /**
    * addJobConfToEnvironment is mostly shamelessly copied from hadoop streaming. Added additional
    * check on environment variable length
@@ -148,13 +170,16 @@ public class ScriptOperator extends Oper
     while (it.hasNext()) {
       Map.Entry<String, String> en = it.next();
       String name = en.getKey();
-      // String value = (String)en.getValue(); // does not apply variable
-      // expansion
-      String value = conf.get(name); // does variable expansion
-      name = safeEnvVarName(name);
-      boolean truncate = conf.getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
-      value = safeEnvVarValue(value, name, truncate);
-      env.put(name, value);
+      if (!blackListed(name)) {
+        // String value = (String)en.getValue(); // does not apply variable
+        // expansion
+        String value = conf.get(name); // does variable expansion
+        name = safeEnvVarName(name);
+        boolean truncate = conf
+            .getBoolean(HiveConf.ConfVars.HIVESCRIPTTRUNCATEENV.toString(), false);
+        value = safeEnvVarValue(value, name, truncate);
+        env.put(name, value);
+      }
     }
   }
 
@@ -238,8 +263,8 @@ public class ScriptOperator extends Oper
   protected void initializeOp(Configuration hconf) throws HiveException {
     firstRow = true;
 
-    statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
-    statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count);
+    statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+    statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count);
 
     try {
       this.hconf = hconf;
@@ -285,6 +310,16 @@ public class ScriptOperator extends Oper
     return;
   }
 
+  private transient String tableName;
+  private transient String partitionName ;
+
+  @Override
+  public void setInputContext(String inputPath, String tableName, String partitionName) {
+    this.tableName = tableName;
+    this.partitionName = partitionName;
+    super.setInputContext(inputPath, tableName, partitionName);
+  }
+
   @Override
   public void processOp(Object row, int tag) throws HiveException {
     // initialize the user's process only when you receive the first row
@@ -313,10 +348,8 @@ public class ScriptOperator extends Oper
 
         String[] wrappedCmdArgs = addWrapper(cmdArgs);
         LOG.info("Executing " + Arrays.asList(wrappedCmdArgs));
-        LOG.info("tablename="
-            + hconf.get(HiveConf.ConfVars.HIVETABLENAME.varname));
-        LOG.info("partname="
-            + hconf.get(HiveConf.ConfVars.HIVEPARTITIONNAME.varname));
+        LOG.info("tablename=" + tableName);
+        LOG.info("partname=" + partitionName);
         LOG.info("alias=" + alias);
 
         ProcessBuilder pb = new ProcessBuilder(wrappedCmdArgs);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Thu Oct 30 16:22:33 2014
@@ -125,4 +125,31 @@ public class SelectOperator extends Oper
   public boolean acceptLimitPushdown() {
     return true;
   }
+
+  /**
+   * Checks whether this select operator does something to the
+   * input tuples.
+   *
+   * @return if it is an identity select operator or not
+   */
+  public boolean isIdentitySelect() {
+    //Safety check
+    if(this.getNumParent() != 1) {
+      return false;
+    }
+
+    //Select *
+    if(this.getConf().isSelStarNoCompute() ||
+        this.getConf().isSelectStar()) {
+      return true;
+    }
+
+    //Check whether the have the same schema
+    if(!OperatorUtils.sameRowSchema(this, this.getParentOperators().get(0))) {
+      return false;
+    }
+
+    return true;
+  }
+
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsNoJobTask.java Thu Oct 30 16:22:33 2014
@@ -153,7 +153,9 @@ public class StatsNoJobTask extends Task
                 partn.getInputFormatClass(), jc);
             InputSplit dummySplit = new FileSplit(file.getPath(), 0, 0,
                 new String[] { partn.getLocation() });
-            Object recordReader = inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
+            org.apache.hadoop.mapred.RecordReader<?, ?> recordReader =
+                (org.apache.hadoop.mapred.RecordReader<?, ?>)
+                inputFormat.getRecordReader(dummySplit, jc, Reporter.NULL);
             StatsProvidingRecordReader statsRR;
             if (recordReader instanceof StatsProvidingRecordReader) {
               statsRR = (StatsProvidingRecordReader) recordReader;
@@ -163,6 +165,7 @@ public class StatsNoJobTask extends Task
               numFiles += 1;
               statsAvailable = true;
             }
+            recordReader.close();
           }
         }
 
@@ -254,6 +257,7 @@ public class StatsNoJobTask extends Task
                 numFiles += 1;
                 statsAvailable = true;
               }
+              recordReader.close();
             }
           }
 

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Thu Oct 30 16:22:33 2014
@@ -80,7 +80,7 @@ public class UnionOperator extends Opera
     for (int p = 0; p < parents; p++) {
       assert (parentFields[p].size() == columns);
       for (int c = 0; c < columns; c++) {
-        if (!columnTypeResolvers[c].update(parentFields[p].get(c)
+        if (!columnTypeResolvers[c].updateForUnionAll(parentFields[p].get(c)
             .getFieldObjectInspector())) {
           // checked in SemanticAnalyzer. Should not happen
           throw new HiveException("Incompatible types for union operator");

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Thu Oct 30 16:22:33 2014
@@ -416,7 +416,7 @@ public final class Utilities {
         }
         gWorkMap.put(path, gWork);
       } else {
-        LOG.debug("Found plan in cache.");
+        LOG.debug("Found plan in cache for name: " + name);
         gWork = gWorkMap.get(path);
       }
       return gWork;
@@ -437,20 +437,20 @@ public final class Utilities {
     }
   }
 
-  public static Map<String, Map<Integer, String>> getScratchColumnVectorTypes(Configuration hiveConf) {
+  public static Map<String, Map<Integer, String>> getAllScratchColumnVectorTypeMaps(Configuration hiveConf) {
     BaseWork baseWork = getMapWork(hiveConf);
     if (baseWork == null) {
       baseWork = getReduceWork(hiveConf);
     }
-    return baseWork.getScratchColumnVectorTypes();
+    return baseWork.getAllScratchColumnVectorTypeMaps();
   }
 
-  public static Map<String, Map<String, Integer>> getScratchColumnMap(Configuration hiveConf) {
+  public static Map<String, Map<String, Integer>> getAllColumnVectorMaps(Configuration hiveConf) {
     BaseWork baseWork = getMapWork(hiveConf);
     if (baseWork == null) {
       baseWork = getReduceWork(hiveConf);
     }
-    return baseWork.getScratchColumnMap();
+    return baseWork.getAllColumnVectorMaps();
   }
 
   public static void setWorkflowAdjacencies(Configuration conf, QueryPlan plan) {
@@ -1635,12 +1635,13 @@ public final class Utilities {
    * Group 6: copy     [copy keyword]
    * Group 8: 2        [copy file index]
    */
+  private static final String COPY_KEYWORD = "_copy_"; // copy keyword
   private static final Pattern COPY_FILE_NAME_TO_TASK_ID_REGEX =
       Pattern.compile("^.*?"+ // any prefix
                       "([0-9]+)"+ // taskId
                       "(_)"+ // separator
                       "([0-9]{1,6})?"+ // attemptId (limited to 6 digits)
-                      "((_)(\\Bcopy\\B)(_)"+ // copy keyword
+                      "((_)(\\Bcopy\\B)(_)" +
                       "([0-9]{1,6})$)?"+ // copy file index
                       "(\\..*)?$"); // any suffix/file extension
 
@@ -2035,6 +2036,15 @@ public final class Utilities {
     return false;
   }
 
+  public static String getBucketFileNameFromPathSubString(String bucketName) {
+    try {
+      return bucketName.split(COPY_KEYWORD)[0];
+    } catch (Exception e) {
+      e.printStackTrace();
+      return bucketName;
+    }
+  }
+
   public static String getNameMessage(Exception e) {
     return e.getClass().getName() + "(" + e.getMessage() + ")";
   }
@@ -2067,15 +2077,21 @@ public final class Utilities {
   public static ClassLoader getSessionSpecifiedClassLoader() {
     SessionState state = SessionState.get();
     if (state == null || state.getConf() == null) {
-      LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Hive Conf not found or Session not initiated, use thread based class loader instead");
+      }
       return JavaUtils.getClassLoader();
     }
     ClassLoader sessionCL = state.getConf().getClassLoader();
-    if (sessionCL != null){
-      LOG.debug("Use session specified class loader");
+    if (sessionCL != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Use session specified class loader");
+      }
       return sessionCL;
     }
-    LOG.debug("Session specified class loader not found, use thread based class loader");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Session specified class loader not found, use thread based class loader");
+    }
     return JavaUtils.getClassLoader();
   }
 
@@ -2363,6 +2379,32 @@ public final class Utilities {
     }
   }
 
+  /**
+   * Copies the storage handler proeprites configured for a table descriptor to a runtime job
+   * configuration.  This differs from {@link #copyTablePropertiesToConf(org.apache.hadoop.hive.ql.plan.TableDesc, org.apache.hadoop.mapred.JobConf)}
+   * in that it does not allow parameters already set in the job to override the values from the
+   * table.  This is important for setting the config up for reading,
+   * as the job may already have values in it from another table.
+   * @param tbl
+   * @param job
+   */
+  public static void copyTablePropertiesToConf(TableDesc tbl, JobConf job) {
+    Properties tblProperties = tbl.getProperties();
+    for(String name: tblProperties.stringPropertyNames()) {
+      String val = (String) tblProperties.get(name);
+      if (val != null) {
+        job.set(name, StringEscapeUtils.escapeJava(val));
+      }
+    }
+    Map<String, String> jobProperties = tbl.getJobProperties();
+    if (jobProperties == null) {
+      return;
+    }
+    for (Map.Entry<String, String> entry : jobProperties.entrySet()) {
+      job.set(entry.getKey(), entry.getValue());
+    }
+  }
+
   private static final Object INPUT_SUMMARY_LOCK = new Object();
 
   /**

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Thu Oct 30 16:22:33 2014
@@ -56,6 +56,8 @@ import org.apache.hadoop.hive.ql.exec.Pa
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionState;
+import org.apache.hadoop.hive.ql.exec.tez.TezSessionPoolManager;
 import org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormatImpl;
@@ -416,6 +418,13 @@ public class ExecDriver extends Task<Map
       Utilities.createTmpDirs(job, mWork);
       Utilities.createTmpDirs(job, rWork);
 
+      SessionState ss = SessionState.get();
+      if (HiveConf.getVar(job, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")
+          && ss != null) {
+        TezSessionState session = ss.getTezSession();
+        TezSessionPoolManager.getInstance().close(session, true);
+      }
+
       // Finally SUBMIT the JOB!
       rj = jc.submitJob(job);
       // replace it back

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Thu Oct 30 16:22:33 2014
@@ -26,8 +26,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.exec.FetchOperator;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
 import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -72,9 +75,6 @@ public class ExecMapper extends MapReduc
   private static boolean done;
 
   // used to log memory usage periodically
-  public static MemoryMXBean memoryMXBean;
-  private long numRows = 0;
-  private long nextCntr = 1;
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
@@ -84,8 +84,6 @@ public class ExecMapper extends MapReduc
   public void configure(JobConf job) {
     execContext = new ExecMapperContext(job);
     // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
     isLogInfoEnabled = l4j.isInfoEnabled();
 
@@ -176,15 +174,6 @@ public class ExecMapper extends MapReduc
         // Since there is no concept of a group, we don't invoke
         // startGroup/endGroup for a mapper
         mo.process((Writable)value);
-        if (isLogInfoEnabled) {
-          numRows++;
-          if (numRows == nextCntr) {
-            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecMapper: processing " + numRows
-                + " rows: used memory = " + used_memory);
-            nextCntr = getNextCntr(numRows);
-          }
-        }
       }
     } catch (Throwable e) {
       abort = true;
@@ -198,18 +187,6 @@ public class ExecMapper extends MapReduc
     }
   }
 
-
-  private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   @Override
   public void close() {
     // No row was processed
@@ -245,13 +222,7 @@ public class ExecMapper extends MapReduc
         }
       }
 
-      if (isLogInfoEnabled) {
-        long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
-            + used_memory);
-      }
-
-      ReportStats rps = new ReportStats(rp);
+      ReportStats rps = new ReportStats(rp, jc);
       mo.preorderMap(rps);
       return;
     } catch (Exception e) {
@@ -288,17 +259,21 @@ public class ExecMapper extends MapReduc
    */
   public static class ReportStats implements Operator.OperatorFunc {
     private final Reporter rp;
+    private final Configuration conf;
+    private final String groupName;
 
-    public ReportStats(Reporter rp) {
+    public ReportStats(Reporter rp, Configuration conf) {
       this.rp = rp;
+      this.conf = conf;
+      this.groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
     }
 
     @Override
     public void func(Operator op) {
-      Map<Enum<?>, Long> opStats = op.getStats();
-      for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
+      Map<String, Long> opStats = op.getStats();
+      for (Map.Entry<String, Long> e : opStats.entrySet()) {
         if (rp != null) {
-          rp.incrCounter(e.getKey(), e.getValue());
+          rp.incrCounter(groupName, e.getKey(), e.getValue());
         }
       }
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Thu Oct 30 16:22:33 2014
@@ -70,8 +70,6 @@ public class ExecReducer extends MapRedu
   private static final boolean isTraceEnabled = LOG.isTraceEnabled();
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
-  // used to log memory usage periodically
-  private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
   // Input value serde needs to be an array to support different SerDe
   // for different tags
   private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
@@ -86,8 +84,6 @@ public class ExecReducer extends MapRedu
   private Reporter rp;
   private boolean abort = false;
   private boolean isTagged = false;
-  private long cntr = 0;
-  private long nextCntr = 1;
   private TableDesc keyTableDesc;
   private TableDesc[] valueTableDesc;
   private ObjectInspector[] rowObjectInspector;
@@ -103,8 +99,6 @@ public class ExecReducer extends MapRedu
     ObjectInspector keyObjectInspector;
 
     if (isInfoEnabled) {
-      LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
       try {
         LOG.info("conf classpath = "
             + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
@@ -245,17 +239,7 @@ public class ExecReducer extends MapRedu
         row.clear();
         row.add(keyObject);
         row.add(valueObject[tag]);
-        if (isInfoEnabled) {
-          cntr++;
-          if (cntr == nextCntr) {
-            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            if (isInfoEnabled) {
-              LOG.info("ExecReducer: processing " + cntr
-                  + " rows: used memory = " + used_memory);
-            }
-            nextCntr = getNextCntr(cntr);
-          }
-        }
+
         try {
           reducer.processOp(row, tag);
         } catch (Exception e) {
@@ -283,17 +267,6 @@ public class ExecReducer extends MapRedu
     }
   }
 
-  private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
-  }
-
   @Override
   public void close() {
 
@@ -310,13 +283,9 @@ public class ExecReducer extends MapRedu
         }
         reducer.endGroup();
       }
-      if (isInfoEnabled) {
-        LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
-            + memoryMXBean.getHeapMemoryUsage().getUsed());
-      }
 
       reducer.close(abort);
-      ReportStats rps = new ReportStats(rp);
+      ReportStats rps = new ReportStats(rp, jc);
       reducer.preorderMap(rps);
 
     } catch (Exception e) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java Thu Oct 30 16:22:33 2014
@@ -238,6 +238,18 @@ public class MapredLocalTask extends Tas
         variables.put(HADOOP_OPTS_KEY, hadoopOpts);
       }
 
+      //For Windows OS, we need to pass HIVE_HADOOP_CLASSPATH Java parameter while starting
+      //Hiveserver2 using "-hiveconf hive.hadoop.classpath=%HIVE_LIB%". This is to combine path(s).
+      if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH)!= null)
+      {
+        if (variables.containsKey("HADOOP_CLASSPATH"))
+        {
+          variables.put("HADOOP_CLASSPATH", variables.get("HADOOP_CLASSPATH") + ";" + HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+        } else {
+          variables.put("HADOOP_CLASSPATH", HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_HADOOP_CLASSPATH));
+        }
+      }
+
       if(variables.containsKey(MapRedTask.HIVE_DEBUG_RECURSIVE)) {
         MapRedTask.configureDebugVariablesForChildJVM(variables);
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionVertex.java Thu Oct 30 16:22:33 2014
@@ -21,16 +21,23 @@ package org.apache.hadoop.hive.ql.exec.t
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 
+import com.google.common.collect.LinkedListMultimap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -38,6 +45,7 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.split.TezGroupedSplit;
 import org.apache.hadoop.mapreduce.split.TezMapReduceSplitsGrouper;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
@@ -67,17 +75,41 @@ import com.google.common.collect.Multima
 import com.google.protobuf.ByteString;
 
 /*
- * Only works with old mapred API
- * Will only work with a single MRInput for now.
+ * This is the central piece for Bucket Map Join and SMB join. It has the following
+ * responsibilities:
+ * 1. Group incoming splits based on bucketing.
+ * 2. Generate new serialized events for the grouped splits.
+ * 3. Create a routing table for the bucket map join and send a serialized version as payload
+ * for the EdgeManager.
+ * 4. For SMB join, generate a grouping according to bucketing for the "small" table side.
  */
 public class CustomPartitionVertex extends VertexManagerPlugin {
 
+  public class PathComparatorForSplit implements Comparator<InputSplit> {
+
+    @Override
+    public int compare(InputSplit inp1, InputSplit inp2) {
+      FileSplit fs1 = (FileSplit) inp1;
+      FileSplit fs2 = (FileSplit) inp2;
+
+      int retval = fs1.getPath().compareTo(fs2.getPath());
+      if (retval != 0) {
+        return retval;
+      }
+
+      if (fs1.getStart() != fs2.getStart()) {
+        return (int) (fs1.getStart() - fs2.getStart());
+      }
+
+      return 0;
+    }
+  }
+
   private static final Log LOG = LogFactory.getLog(CustomPartitionVertex.class.getName());
 
   VertexManagerPluginContext context;
 
   private InputConfigureVertexTasksEvent configureVertexTaskEvent;
-  private List<InputDataInformationEvent> dataInformationEvents;
   private int numBuckets = -1;
   private Configuration conf = null;
   private final SplitGrouper grouper = new SplitGrouper();
@@ -89,6 +121,13 @@ public class CustomPartitionVertex exten
   private final Map<String, Multimap<Integer, InputSplit>> inputToGroupedSplitMap =
       new HashMap<String, Multimap<Integer, InputSplit>>();
 
+  private int numInputsAffectingRootInputSpecUpdate = 1;
+  private int numInputsSeenSoFar = 0;
+  private final Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
+  private final List<InputSplit> finalSplits = Lists.newLinkedList();
+  private final Map<String, InputSpecUpdate> inputNameInputSpecMap =
+      new HashMap<String, InputSpecUpdate>();
+
   public CustomPartitionVertex(VertexManagerPluginContext context) {
     super(context);
   }
@@ -108,12 +147,13 @@ public class CustomPartitionVertex exten
     this.numBuckets = vertexConf.getNumBuckets();
     this.mainWorkName = vertexConf.getInputName();
     this.vertexType = vertexConf.getVertexType();
+    this.numInputsAffectingRootInputSpecUpdate = vertexConf.getNumInputs();
   }
 
   @Override
   public void onVertexStarted(Map<String, List<Integer>> completions) {
     int numTasks = context.getVertexNumTasks(context.getVertexName());
-    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks = 
+    List<VertexManagerPluginContext.TaskWithLocationHint> scheduledTasks =
       new ArrayList<VertexManagerPluginContext.TaskWithLocationHint>(numTasks);
     for (int i = 0; i < numTasks; ++i) {
       scheduledTasks.add(new VertexManagerPluginContext.TaskWithLocationHint(new Integer(i), null));
@@ -133,8 +173,8 @@ public class CustomPartitionVertex exten
   @Override
   public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
       List<Event> events) {
+    numInputsSeenSoFar++;
     LOG.info("On root vertex initialized " + inputName);
-
     try {
       // This is using the payload from the RootVertexInitializer corresponding
       // to InputName. Ideally it should be using it's own configuration class -
@@ -168,20 +208,21 @@ public class CustomPartitionVertex exten
     }
 
     boolean dataInformationEventSeen = false;
-    Map<String, List<FileSplit>> pathFileSplitsMap = new TreeMap<String, List<FileSplit>>();
+    Map<String, Set<FileSplit>> pathFileSplitsMap = new TreeMap<String, Set<FileSplit>>();
 
     for (Event event : events) {
       if (event instanceof InputConfigureVertexTasksEvent) {
         // No tasks should have been started yet. Checked by initial state
         // check.
+        LOG.info("Got a input configure vertex event for input: " + inputName);
         Preconditions.checkState(dataInformationEventSeen == false);
         InputConfigureVertexTasksEvent cEvent = (InputConfigureVertexTasksEvent) event;
 
         // The vertex cannot be configured until all DataEvents are seen - to
         // build the routing table.
         configureVertexTaskEvent = cEvent;
-        dataInformationEvents =
-            Lists.newArrayListWithCapacity(configureVertexTaskEvent.getNumTasks());
+        LOG.info("Configure task for input name: " + inputName + " num tasks: "
+            + configureVertexTaskEvent.getNumTasks());
       }
       if (event instanceof InputUpdatePayloadEvent) {
         // this event can never occur. If it does, fail.
@@ -189,22 +230,26 @@ public class CustomPartitionVertex exten
       } else if (event instanceof InputDataInformationEvent) {
         dataInformationEventSeen = true;
         InputDataInformationEvent diEvent = (InputDataInformationEvent) event;
-        dataInformationEvents.add(diEvent);
         FileSplit fileSplit;
         try {
           fileSplit = getFileSplitFromEvent(diEvent);
         } catch (IOException e) {
           throw new RuntimeException("Failed to get file split for event: " + diEvent);
         }
-        List<FileSplit> fsList = pathFileSplitsMap.get(fileSplit.getPath().getName());
+        Set<FileSplit> fsList =
+            pathFileSplitsMap.get(Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath()
+                .getName()));
         if (fsList == null) {
-          fsList = new ArrayList<FileSplit>();
-          pathFileSplitsMap.put(fileSplit.getPath().getName(), fsList);
+          fsList = new TreeSet<FileSplit>(new PathComparatorForSplit());
+          pathFileSplitsMap.put(
+              Utilities.getBucketFileNameFromPathSubString(fileSplit.getPath().getName()), fsList);
         }
         fsList.add(fileSplit);
       }
     }
 
+    LOG.info("Path file splits map for input name: " + inputName + " is " + pathFileSplitsMap);
+
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         getBucketSplitMapForPath(pathFileSplitsMap);
 
@@ -217,77 +262,144 @@ public class CustomPartitionVertex exten
 
       int availableSlots = totalResource / taskResource;
 
-      LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves + " waves.");
+      LOG.info("Grouping splits. " + availableSlots + " available slots, " + waves
+          + " waves. Bucket initial splits map: " + bucketToInitialSplitMap);
       JobConf jobConf = new JobConf(conf);
       ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
 
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap =
           HashMultimap.<Integer, InputSplit> create();
-      for (Integer key : bucketToInitialSplitMap.keySet()) {
-        InputSplit[] inputSplitArray =
-            (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
-        Multimap<Integer, InputSplit> groupedSplit =
-            HiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
-                availableSlots, inputName);
-        bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
-      }
-
-      LOG.info("We have grouped the splits into " + bucketToGroupedSplitMap);
-      if ((mainWorkName.isEmpty() == false) && (mainWorkName.compareTo(inputName) != 0)) {
+      boolean secondLevelGroupingDone = false;
+      if ((mainWorkName.isEmpty()) || (inputName.compareTo(mainWorkName) == 0)) {
+        for (Integer key : bucketToInitialSplitMap.keySet()) {
+          InputSplit[] inputSplitArray =
+              (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+          HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+          Multimap<Integer, InputSplit> groupedSplit =
+              hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+                  availableSlots, inputName, mainWorkName.isEmpty());
+          if (mainWorkName.isEmpty() == false) {
+            Multimap<Integer, InputSplit> singleBucketToGroupedSplit =
+                HashMultimap.<Integer, InputSplit> create();
+            singleBucketToGroupedSplit.putAll(key, groupedSplit.values());
+            groupedSplit =
+                grouper.group(jobConf, singleBucketToGroupedSplit, availableSlots,
+                    HiveConf.getFloatVar(conf, HiveConf.ConfVars.TEZ_SMB_NUMBER_WAVES));
+            secondLevelGroupingDone = true;
+          }
+          bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+        }
+        processAllEvents(inputName, bucketToGroupedSplitMap, secondLevelGroupingDone);
+      } else {
+        // do not group across files in case of side work because there is only 1 KV reader per
+        // grouped split. This would affect SMB joins where we want to find the smallest key in
+        // all the bucket files.
+        for (Integer key : bucketToInitialSplitMap.keySet()) {
+          HiveSplitGenerator hiveSplitGenerator = new HiveSplitGenerator();
+          InputSplit[] inputSplitArray =
+              (bucketToInitialSplitMap.get(key).toArray(new InputSplit[0]));
+          Multimap<Integer, InputSplit> groupedSplit =
+              hiveSplitGenerator.generateGroupedSplits(jobConf, conf, inputSplitArray, waves,
+                    availableSlots, inputName, false);
+            bucketToGroupedSplitMap.putAll(key, groupedSplit.values());
+        }
         /*
-         * this is the small table side. In case of SMB join, we may need to send each split to the
+         * this is the small table side. In case of SMB join, we need to send each split to the
          * corresponding bucket-based task on the other side. In case a split needs to go to
          * multiple downstream tasks, we need to clone the event and send it to the right
          * destination.
          */
-        processAllSideEvents(inputName, bucketToGroupedSplitMap);
-      } else {
-        processAllEvents(inputName, bucketToGroupedSplitMap);
+        LOG.info("This is the side work - multi-mr work.");
+        processAllSideEventsSetParallelism(inputName, bucketToGroupedSplitMap);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
   }
 
-  private void processAllSideEvents(String inputName,
+  private void processAllSideEventsSetParallelism(String inputName,
       Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
     // the bucket to task map should have been setup by the big table.
+    LOG.info("Processing events for input " + inputName);
     if (bucketToTaskMap.isEmpty()) {
+      LOG.info("We don't have a routing table yet. Will need to wait for the main input"
+          + " initialization");
       inputToGroupedSplitMap.put(inputName, bucketToGroupedSplitMap);
       return;
     }
+    processAllSideEvents(inputName, bucketToGroupedSplitMap);
+    setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+  }
+
+  private void processAllSideEvents(String inputName,
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
     List<InputDataInformationEvent> taskEvents = new ArrayList<InputDataInformationEvent>();
+    LOG.info("We have a routing table and we are going to set the destination tasks for the"
+        + " multi mr inputs. " + bucketToTaskMap);
+
+    Integer[] numSplitsForTask = new Integer[taskCount];
+
+    Multimap<Integer, ByteBuffer> bucketToSerializedSplitMap = LinkedListMultimap.create();
+
+    // Create the list of serialized splits for each bucket.
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
+      for (InputSplit split : entry.getValue()) {
+        MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+        ByteBuffer bs = serializedSplit.toByteString().asReadOnlyByteBuffer();
+        bucketToSerializedSplitMap.put(entry.getKey(), bs);
+      }
+    }
+
+    for (Entry<Integer, Collection<ByteBuffer>> entry : bucketToSerializedSplitMap.asMap().entrySet()) {
       Collection<Integer> destTasks = bucketToTaskMap.get(entry.getKey());
       for (Integer task : destTasks) {
-        for (InputSplit split : entry.getValue()) {
-          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(split);
+        int count = 0;
+        for (ByteBuffer buf : entry.getValue()) {
+          count++;
           InputDataInformationEvent diEvent =
-              InputDataInformationEvent.createWithSerializedPayload(task, serializedSplit
-                  .toByteString().asReadOnlyByteBuffer());
+              InputDataInformationEvent.createWithSerializedPayload(count, buf);
           diEvent.setTargetIndex(task);
           taskEvents.add(diEvent);
         }
+        numSplitsForTask[task] = count;
       }
     }
 
+    inputNameInputSpecMap.put(inputName,
+        InputSpecUpdate.createPerTaskInputSpecUpdate(Arrays.asList(numSplitsForTask)));
+
+    LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
+
     context.addRootInputEvents(inputName, taskEvents);
   }
 
   private void processAllEvents(String inputName,
-      Multimap<Integer, InputSplit> bucketToGroupedSplitMap) throws IOException {
+      Multimap<Integer, InputSplit> bucketToGroupedSplitMap, boolean secondLevelGroupingDone)
+      throws IOException {
 
-    List<InputSplit> finalSplits = Lists.newLinkedList();
+    int totalInputsCount = 0;
+    List<Integer> numSplitsForTask = new ArrayList<Integer>();
     for (Entry<Integer, Collection<InputSplit>> entry : bucketToGroupedSplitMap.asMap().entrySet()) {
       int bucketNum = entry.getKey();
       Collection<InputSplit> initialSplits = entry.getValue();
       finalSplits.addAll(initialSplits);
-      for (int i = 0; i < initialSplits.size(); i++) {
+      for (InputSplit inputSplit : initialSplits) {
         bucketToTaskMap.put(bucketNum, taskCount);
+        if (secondLevelGroupingDone) {
+          TezGroupedSplit groupedSplit = (TezGroupedSplit) inputSplit;
+          numSplitsForTask.add(groupedSplit.getGroupedSplits().size());
+          totalInputsCount += groupedSplit.getGroupedSplits().size();
+        } else {
+          numSplitsForTask.add(1);
+          totalInputsCount += 1;
+        }
         taskCount++;
       }
     }
 
+    inputNameInputSpecMap.put(inputName,
+        InputSpecUpdate.createPerTaskInputSpecUpdate(numSplitsForTask));
+
     // Construct the EdgeManager descriptor to be used by all edges which need
     // the routing table.
     EdgeManagerPluginDescriptor hiveEdgeManagerDesc = null;
@@ -297,7 +409,6 @@ public class CustomPartitionVertex exten
       UserPayload payload = getBytePayload(bucketToTaskMap);
       hiveEdgeManagerDesc.setUserPayload(payload);
     }
-    Map<String, EdgeManagerPluginDescriptor> emMap = Maps.newHashMap();
 
     // Replace the edge manager for all vertices which have routing type custom.
     for (Entry<String, EdgeProperty> edgeEntry : context.getInputVertexEdgeProperties().entrySet()) {
@@ -308,42 +419,67 @@ public class CustomPartitionVertex exten
       }
     }
 
-    LOG.info("Task count is " + taskCount);
+    LOG.info("Task count is " + taskCount + " for input name: " + inputName);
 
-    List<InputDataInformationEvent> taskEvents =
-        Lists.newArrayListWithCapacity(finalSplits.size());
+    List<InputDataInformationEvent> taskEvents = Lists.newArrayListWithCapacity(totalInputsCount);
     // Re-serialize the splits after grouping.
     int count = 0;
     for (InputSplit inputSplit : finalSplits) {
-      MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
-      InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload(
-          count, serializedSplit.toByteString().asReadOnlyByteBuffer());
-      diEvent.setTargetIndex(count);
+      if (secondLevelGroupingDone) {
+        TezGroupedSplit tezGroupedSplit = (TezGroupedSplit)inputSplit;
+        for (InputSplit subSplit : tezGroupedSplit.getGroupedSplits()) {
+          if ((subSplit instanceof TezGroupedSplit) == false) {
+            throw new IOException("Unexpected split type found: "
+                + subSplit.getClass().getCanonicalName());
+          }
+          MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(subSplit);
+          InputDataInformationEvent diEvent =
+              InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+                  .toByteString().asReadOnlyByteBuffer());
+          diEvent.setTargetIndex(count);
+          taskEvents.add(diEvent);
+        }
+      } else {
+        MRSplitProto serializedSplit = MRInputHelpers.createSplitProto(inputSplit);
+        InputDataInformationEvent diEvent =
+            InputDataInformationEvent.createWithSerializedPayload(count, serializedSplit
+                .toByteString().asReadOnlyByteBuffer());
+        diEvent.setTargetIndex(count);
+        taskEvents.add(diEvent);
+      }
       count++;
-      taskEvents.add(diEvent);
-    }
-
-    // Replace the Edge Managers
-    Map<String, InputSpecUpdate> rootInputSpecUpdate =
-      new HashMap<String, InputSpecUpdate>();
-    rootInputSpecUpdate.put(
-        inputName,
-        InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
-    if ((mainWorkName.compareTo(inputName) == 0) || (mainWorkName.isEmpty())) {
-      context.setVertexParallelism(
-          taskCount,
-          VertexLocationHint.create(grouper.createTaskLocationHints(finalSplits
-              .toArray(new InputSplit[finalSplits.size()]))), emMap, rootInputSpecUpdate);
     }
 
     // Set the actual events for the tasks.
+    LOG.info("For input name: " + inputName + " task events size is " + taskEvents.size());
     context.addRootInputEvents(inputName, taskEvents);
     if (inputToGroupedSplitMap.isEmpty() == false) {
       for (Entry<String, Multimap<Integer, InputSplit>> entry : inputToGroupedSplitMap.entrySet()) {
         processAllSideEvents(entry.getKey(), entry.getValue());
       }
+      setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
       inputToGroupedSplitMap.clear();
     }
+
+    // Only done when it is a bucket map join only no SMB.
+    if (numInputsAffectingRootInputSpecUpdate == 1) {
+      setVertexParallelismAndRootInputSpec(inputNameInputSpecMap);
+    }
+  }
+
+  private void
+      setVertexParallelismAndRootInputSpec(Map<String, InputSpecUpdate> rootInputSpecUpdate)
+          throws IOException {
+    if (numInputsAffectingRootInputSpecUpdate != numInputsSeenSoFar) {
+      return;
+    }
+
+    LOG.info("Setting vertex parallelism since we have seen all inputs.");
+
+    context.setVertexParallelism(taskCount, VertexLocationHint.create(grouper
+        .createTaskLocationHints(finalSplits.toArray(new InputSplit[finalSplits.size()]))), emMap,
+        rootInputSpecUpdate);
+    finalSplits.clear();
   }
 
   UserPayload getBytePayload(Multimap<Integer, Integer> routingTable) throws IOException {
@@ -377,14 +513,14 @@ public class CustomPartitionVertex exten
    * This method generates the map of bucket to file splits.
    */
   private Multimap<Integer, InputSplit> getBucketSplitMapForPath(
-      Map<String, List<FileSplit>> pathFileSplitsMap) {
+      Map<String, Set<FileSplit>> pathFileSplitsMap) {
 
     int bucketNum = 0;
 
     Multimap<Integer, InputSplit> bucketToInitialSplitMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    for (Map.Entry<String, List<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
+    for (Map.Entry<String, Set<FileSplit>> entry : pathFileSplitsMap.entrySet()) {
       int bucketId = bucketNum % numBuckets;
       for (FileSplit fsplit : entry.getValue()) {
         bucketToInitialSplitMap.put(bucketId, fsplit);
@@ -392,6 +528,11 @@ public class CustomPartitionVertex exten
       bucketNum++;
     }
 
+    // this is just for SMB join use-case. The numBuckets would be equal to that of the big table
+    // and the small table could have lesser number of buckets. In this case, we want to send the
+    // data from the right buckets to the big table side. For e.g. Big table has 8 buckets and small
+    // table has 4 buckets, bucket 0 of small table needs to be sent to bucket 4 of the big table as
+    // well.
     if (bucketNum < numBuckets) {
       int loopedBucketId = 0;
       for (; bucketNum < numBuckets; bucketNum++) {

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomVertexConfiguration.java Thu Oct 30 16:22:33 2014
@@ -29,20 +29,31 @@ import org.apache.hadoop.io.Writable;
  * This class is the payload for custom vertex. It serializes and de-serializes
  * @numBuckets: the number of buckets of the "big table"
  * @vertexType: this is the type of vertex and differentiates between bucket map join and SMB joins
- * @inputName: This is the name of the input. Used in case of SMB joins
+ * @numInputs: The number of inputs that are directly connected to the vertex (MRInput/MultiMRInput).
+ *             In case of bucket map join, it is always 1.
+ * @inputName: This is the name of the input. Used in case of SMB joins. Empty in case of BucketMapJoin
  */
 public class CustomVertexConfiguration implements Writable {
 
   private int numBuckets;
   private VertexType vertexType = VertexType.AUTO_INITIALIZED_EDGES;
+  private int numInputs;
   private String inputName;
 
   public CustomVertexConfiguration() {
   }
 
-  public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName) {
+  // this is the constructor to use for the Bucket map join case.
+  public CustomVertexConfiguration(int numBuckets, VertexType vertexType) {
+    this(numBuckets, vertexType, "", 1);
+  }
+
+  // this is the constructor to use for SMB.
+  public CustomVertexConfiguration(int numBuckets, VertexType vertexType, String inputName,
+      int numInputs) {
     this.numBuckets = numBuckets;
     this.vertexType = vertexType;
+    this.numInputs = numInputs;
     this.inputName = inputName;
   }
 
@@ -50,6 +61,7 @@ public class CustomVertexConfiguration i
   public void write(DataOutput out) throws IOException {
     out.writeInt(this.vertexType.ordinal());
     out.writeInt(this.numBuckets);
+    out.writeInt(numInputs);
     out.writeUTF(inputName);
   }
 
@@ -57,6 +69,7 @@ public class CustomVertexConfiguration i
   public void readFields(DataInput in) throws IOException {
     this.vertexType = VertexType.values()[in.readInt()];
     this.numBuckets = in.readInt();
+    this.numInputs = in.readInt();
     this.inputName = in.readUTF();
   }
 
@@ -71,4 +84,8 @@ public class CustomVertexConfiguration i
   public String getInputName() {
     return inputName;
   }
+
+  public int getNumInputs() {
+    return numInputs;
+  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Thu Oct 30 16:22:33 2014
@@ -20,8 +20,6 @@ package org.apache.hadoop.hive.ql.exec.t
 import com.google.common.base.Function;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
 import javax.security.auth.login.LoginException;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -49,7 +47,7 @@ import org.apache.hadoop.hive.conf.HiveC
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -110,16 +108,13 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
 import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
-import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRInputHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.protos.MRRuntimeProtos;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.comparator.TezBytesComparator;
 import org.apache.tez.runtime.library.common.serializer.TezBytesWritableSerialization;
@@ -178,6 +173,8 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) {
     JobConf conf = new JobConf(baseConf);
 
+    conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName());
+
     if (mapWork.getNumMapTasks() != null) {
       // Is this required ?
       conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
@@ -268,8 +265,7 @@ public class DagUtils {
     case CUSTOM_EDGE: {
       mergeInputClass = ConcatenatedMergedKeyValueInput.class;
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc =
@@ -314,8 +310,7 @@ public class DagUtils {
     switch(edgeProp.getEdgeType()) {
     case CUSTOM_EDGE: {
       int numBuckets = edgeProp.getNumBuckets();
-      CustomVertexConfiguration vertexConf =
-          new CustomVertexConfiguration(numBuckets, vertexType, "");
+      CustomVertexConfiguration vertexConf = new CustomVertexConfiguration(numBuckets, vertexType);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       VertexManagerPluginDescriptor desc = VertexManagerPluginDescriptor.create(
@@ -340,7 +335,6 @@ public class DagUtils {
   /*
    * Helper function to create an edge property from an edge type.
    */
-  @SuppressWarnings("rawtypes")
   private EdgeProperty createEdgeProperty(TezEdgeProperty edgeProp, Configuration conf)
       throws IOException {
     MRHelpers.translateMRConfToTez(conf);
@@ -431,8 +425,9 @@ public class DagUtils {
     int memory = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0 ?
       HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) :
       conf.getInt(MRJobConfig.MAP_MEMORY_MB, MRJobConfig.DEFAULT_MAP_MEMORY_MB);
-    int cpus = conf.getInt(MRJobConfig.MAP_CPU_VCORES,
-        MRJobConfig.DEFAULT_MAP_CPU_VCORES);
+    int cpus = HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) > 0 ?
+      HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCPUVCORES) :
+      conf.getInt(MRJobConfig.MAP_CPU_VCORES, MRJobConfig.DEFAULT_MAP_CPU_VCORES);
     return Resource.newInstance(memory, cpus);
   }
 
@@ -452,17 +447,29 @@ public class DagUtils {
    */
   private String getContainerJavaOpts(Configuration conf) {
     String javaOpts = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZJAVAOPTS);
-    if (javaOpts != null && !javaOpts.isEmpty()) {
-      String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
-      List<String> logProps = Lists.newArrayList();
-      TezUtils.addLog4jSystemProperties(logLevel, logProps);
-      StringBuilder sb = new StringBuilder();
-      for (String str : logProps) {
-        sb.append(str).append(" ");
+
+    String logLevel = HiveConf.getVar(conf, HiveConf.ConfVars.HIVETEZLOGLEVEL);
+    List<String> logProps = Lists.newArrayList();
+    TezUtils.addLog4jSystemProperties(logLevel, logProps);
+    StringBuilder sb = new StringBuilder();
+    for (String str : logProps) {
+      sb.append(str).append(" ");
+    }
+    logLevel = sb.toString();
+
+    if (HiveConf.getIntVar(conf, HiveConf.ConfVars.HIVETEZCONTAINERSIZE) > 0) {
+      if (javaOpts != null) {
+        return javaOpts + " " + logLevel;
+      } else  {
+        return logLevel;
+      }
+    } else {
+      if (javaOpts != null && !javaOpts.isEmpty()) {
+        LOG.warn(HiveConf.ConfVars.HIVETEZJAVAOPTS + " will be ignored because "
+                 + HiveConf.ConfVars.HIVETEZCONTAINERSIZE + " is not set!");
       }
-      return javaOpts + " " + sb.toString();
+      return logLevel + " " + MRHelpers.getJavaOptsForMRMapper(conf);
     }
-    return MRHelpers.getJavaOptsForMRMapper(conf);
   }
 
   private Vertex createVertex(JobConf conf, MergeJoinWork mergeJoinWork, LocalResource appJarLr,
@@ -473,13 +480,9 @@ public class DagUtils {
     if (mergeJoinWork.getMainWork() instanceof MapWork) {
       List<BaseWork> mapWorkList = mergeJoinWork.getBaseWorkList();
       MapWork mapWork = (MapWork) (mergeJoinWork.getMainWork());
-      CommonMergeJoinOperator mergeJoinOp = mergeJoinWork.getMergeJoinOperator();
       Vertex mergeVx =
           createVertex(conf, mapWork, appJarLr, additionalLr, fs, mrScratchDir, ctx, vertexType);
 
-      // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
-      // here would cause pre-mature grouping which would be incorrect.
-      Class inputFormatClass = HiveInputFormat.class;
       conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
       // mapreduce.tez.input.initializer.serialize.event.payload should be set
       // to false when using this plug-in to avoid getting a serialized event at run-time.
@@ -496,9 +499,11 @@ public class DagUtils {
 
       VertexManagerPluginDescriptor desc =
         VertexManagerPluginDescriptor.create(CustomPartitionVertex.class.getName());
+      // the +1 to the size is because of the main work.
       CustomVertexConfiguration vertexConf =
           new CustomVertexConfiguration(mergeJoinWork.getMergeJoinOperator().getConf()
-              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias());
+              .getNumBuckets(), vertexType, mergeJoinWork.getBigTableAlias(),
+              mapWorkList.size() + 1);
       DataOutputBuffer dob = new DataOutputBuffer();
       vertexConf.write(dob);
       byte[] userPayload = dob.getData();
@@ -538,6 +543,7 @@ public class DagUtils {
     DataSourceDescriptor dataSource;
 
     int numTasks = -1;
+    @SuppressWarnings("rawtypes")
     Class inputFormatClass = conf.getClass("mapred.input.format.class",
         InputFormat.class);
 
@@ -595,7 +601,13 @@ public class DagUtils {
             .setCustomInitializerDescriptor(descriptor).build();
       } else {
         // Not HiveInputFormat, or a custom VertexManager will take care of grouping splits
-        dataSource = MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        if (vertexHasCustomInput) {
+          dataSource =
+              MultiMRInput.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        } else {
+          dataSource =
+              MRInputLegacy.createConfigBuilder(conf, inputFormatClass).groupSplits(false).build();
+        }
       }
     } else {
       // Setup client side split generation.
@@ -640,6 +652,8 @@ public class DagUtils {
   private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
+    conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName());
+
     // Is this required ?
     conf.set("mapred.reducer.class", ExecReducer.class.getName());
 
@@ -745,6 +759,7 @@ public class DagUtils {
    * @throws LoginException if we are unable to figure user information
    * @throws IOException when any dfs operation fails.
    */
+  @SuppressWarnings("deprecation")
   public Path getDefaultDestDir(Configuration conf) throws LoginException, IOException {
     UserGroupInformation ugi = ShimLoader.getHadoopShims().getUGIForConf(conf);
     String userName = ShimLoader.getHadoopShims().getShortUserName(ugi);
@@ -857,6 +872,7 @@ public class DagUtils {
     return fstatus;
   }
 
+  @SuppressWarnings("deprecation")
   public static FileStatus validateTargetDir(Path path, Configuration conf) throws IOException {
     FileSystem fs = path.getFileSystem(conf);
     FileStatus fstatus = null;
@@ -971,7 +987,7 @@ public class DagUtils {
   public JobConf createConfiguration(HiveConf hiveConf) throws IOException {
     hiveConf.setBoolean("mapred.mapper.new-api", false);
 
-    JobConf conf = new JobConf(hiveConf);
+    JobConf conf = new JobConf(new TezConfiguration(hiveConf));
 
     conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
 
@@ -1033,6 +1049,7 @@ public class DagUtils {
    * @param ctx This query's context
    * @return Vertex
    */
+  @SuppressWarnings("deprecation")
   public Vertex createVertex(JobConf conf, BaseWork work,
       Path scratchDir, LocalResource appJarLr,
       List<LocalResource> additionalLr, FileSystem fileSystem, Context ctx, boolean hasChildren,

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DynamicPartitionPruner.java Thu Oct 30 16:22:33 2014
@@ -87,6 +87,8 @@ public class DynamicPartitionPruner {
 
   private final Object endOfEvents = new Object();
 
+  private int totalEventCount = 0;
+
   public DynamicPartitionPruner() {
   }
 
@@ -114,7 +116,7 @@ public class DynamicPartitionPruner {
     // synchronous event processing loop. Won't return until all events have
     // been processed.
     this.processEvents();
-    this.prunePartitions(work);
+    this.prunePartitions(work, context);
     LOG.info("Ok to proceed.");
   }
 
@@ -163,12 +165,22 @@ public class DynamicPartitionPruner {
     }
   }
 
-  private void prunePartitions(MapWork work) throws HiveException {
+  private void prunePartitions(MapWork work, InputInitializerContext context) throws HiveException {
+    int expectedEvents = 0;
     for (String source : this.sourceInfoMap.keySet()) {
       for (SourceInfo si : this.sourceInfoMap.get(source)) {
+        int taskNum = context.getVertexNumTasks(source);
+        LOG.info("Expecting " + taskNum + " events for vertex " + source);
+        expectedEvents += taskNum;
         prunePartitionSingleSource(source, si, work);
       }
     }
+
+    // sanity check. all tasks must submit events for us to succeed.
+    if (expectedEvents != totalEventCount) {
+      LOG.error("Expecting: " + expectedEvents + ", received: " + totalEventCount);
+      throw new HiveException("Incorrect event count in dynamic parition pruning");
+    }
   }
 
   private void prunePartitionSingleSource(String source, SourceInfo si, MapWork work)
@@ -396,7 +408,8 @@ public class DynamicPartitionPruner {
   public void addEvent(InputInitializerEvent event) {
     synchronized(sourcesWaitingForEvents) {
       if (sourcesWaitingForEvents.contains(event.getSourceVertexName())) {
-          queue.offer(event);
+        ++totalEventCount;
+        queue.offer(event);
       }
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Thu Oct 30 16:22:33 2014
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,7 +66,6 @@ import com.google.common.collect.Multima
  * making sure that splits from different partitions are only grouped if they
  * are of the same schema, format and serde
  */
-@SuppressWarnings("deprecation")
 public class HiveSplitGenerator extends InputInitializer {
 
   private static final Log LOG = LogFactory.getLog(HiveSplitGenerator.class);
@@ -72,11 +73,17 @@ public class HiveSplitGenerator extends 
   private static final SplitGrouper grouper = new SplitGrouper();
   private final DynamicPartitionPruner pruner = new DynamicPartitionPruner();
   private InputInitializerContext context;
+  private static Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
+      new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
 
   public HiveSplitGenerator(InputInitializerContext initializerContext) {
     super(initializerContext);
   }
 
+  public HiveSplitGenerator() {
+    this(null);
+  }
+
   @Override
   public List<Event> initialize() throws Exception {
     InputInitializerContext rootInputContext = getContext();
@@ -150,58 +157,28 @@ public class HiveSplitGenerator extends 
   }
 
 
-  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
       Configuration conf, InputSplit[] splits, float waves, int availableSlots)
       throws Exception {
-    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null);
+    return generateGroupedSplits(jobConf, conf, splits, waves, availableSlots, null, true);
   }
 
-  public static Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
-      Configuration conf, InputSplit[] splits, float waves, int availableSlots,
-      String inputName) throws Exception {
-
-    MapWork work = null;
-    if (inputName != null) {
-      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
-      // work can still be null if there is no merge work for this input
-    }
-    if (work == null) {
-      work = Utilities.getMapWork(jobConf);
-    }
+  public Multimap<Integer, InputSplit> generateGroupedSplits(JobConf jobConf,
+      Configuration conf, InputSplit[] splits, float waves, int availableSlots, String inputName,
+      boolean groupAcrossFiles) throws Exception {
 
+    MapWork work = populateMapWork(jobConf, inputName);
     Multimap<Integer, InputSplit> bucketSplitMultiMap =
         ArrayListMultimap.<Integer, InputSplit> create();
 
-    Class<?> previousInputFormatClass = null;
-    String previousDeserializerClass = null;
-    Map<Map<String, PartitionDesc>, Map<String, PartitionDesc>> cache =
-        new HashMap<Map<String, PartitionDesc>, Map<String, PartitionDesc>>();
-
     int i = 0;
-
+    InputSplit prevSplit = null;
     for (InputSplit s : splits) {
       // this is the bit where we make sure we don't group across partition
       // schema boundaries
-
-      Path path = ((FileSplit) s).getPath();
-
-      PartitionDesc pd =
-          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
-              path, cache);
-
-      String currentDeserializerClass = pd.getDeserializerClassName();
-      Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
-
-      if ((currentInputFormatClass != previousInputFormatClass)
-          || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+      if (schemaEvolved(s, prevSplit, groupAcrossFiles, work)) {
         ++i;
-      }
-
-      previousInputFormatClass = currentInputFormatClass;
-      previousDeserializerClass = currentDeserializerClass;
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Adding split " + path + " to src group " + i);
+        prevSplit = s;
       }
       bucketSplitMultiMap.put(i, s);
     }
@@ -214,6 +191,54 @@ public class HiveSplitGenerator extends 
     return groupedSplits;
   }
 
+  private MapWork populateMapWork(JobConf jobConf, String inputName) {
+    MapWork work = null;
+    if (inputName != null) {
+      work = (MapWork) Utilities.getMergeWork(jobConf, inputName);
+      // work can still be null if there is no merge work for this input
+    }
+    if (work == null) {
+      work = Utilities.getMapWork(jobConf);
+    }
+
+    return work;
+  }
+
+  public boolean schemaEvolved(InputSplit s, InputSplit prevSplit, boolean groupAcrossFiles,
+      MapWork work) throws IOException {
+    boolean retval = false;
+    Path path = ((FileSplit) s).getPath();
+    PartitionDesc pd =
+        HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+            path, cache);
+    String currentDeserializerClass = pd.getDeserializerClassName();
+    Class<?> currentInputFormatClass = pd.getInputFileFormatClass();
+
+    Class<?> previousInputFormatClass = null;
+    String previousDeserializerClass = null;
+    if (prevSplit != null) {
+      Path prevPath = ((FileSplit) prevSplit).getPath();
+      if (!groupAcrossFiles) {
+        return !path.equals(prevPath);
+      }
+      PartitionDesc prevPD =
+          HiveFileFormatUtils.getPartitionDescFromPathRecursively(work.getPathToPartitionInfo(),
+              prevPath, cache);
+      previousDeserializerClass = prevPD.getDeserializerClassName();
+      previousInputFormatClass = prevPD.getInputFileFormatClass();
+    }
+
+    if ((currentInputFormatClass != previousInputFormatClass)
+        || (!currentDeserializerClass.equals(previousDeserializerClass))) {
+      retval = true;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Adding split " + path + " to src new group? " + retval);
+    }
+    return retval;
+  }
+
   private List<Event> createEventList(boolean sendSerializedEvents, InputSplitInfoMem inputSplitInfo) {
 
     List<Event> events = Lists.newArrayListWithCapacity(inputSplitInfo.getNumTasks() + 1);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Thu Oct 30 16:22:33 2014
@@ -17,10 +17,10 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -44,15 +44,15 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezProcessor.TezKVOutputCollector;
 import org.apache.hadoop.hive.ql.exec.tez.tools.KeyValueInputMerger;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
-import org.apache.hadoop.hive.ql.io.IOContext;
 import org.apache.hadoop.hive.ql.log.PerfLogger;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.tez.mapreduce.input.MRInputLegacy;
 import org.apache.tez.mapreduce.input.MultiMRInput;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
-import org.apache.tez.runtime.api.Input;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.ProcessorContext;
@@ -73,6 +73,7 @@ public class MapRecordProcessor extends 
   private int position = 0;
   private boolean foundCachedMergeWork = false;
   MRInputLegacy legacyMRInput = null;
+  MultiMRInput mainWorkMultiMRInput = null;
   private ExecMapperContext execContext = null;
   private boolean abort = false;
   protected static final String MAP_PLAN_KEY = "__MAP_PLAN__";
@@ -129,12 +130,14 @@ public class MapRecordProcessor extends 
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.TEZ_INIT_OPERATORS);
     super.init(jconf, processorContext, mrReporter, inputs, outputs);
 
-    //Update JobConf using MRInput, info like filename comes via this
+    // Update JobConf using MRInput, info like filename comes via this
     legacyMRInput = getMRInput(inputs);
-    Configuration updatedConf = legacyMRInput.getConfigUpdates();
-    if (updatedConf != null) {
-      for (Entry<String, String> entry : updatedConf) {
-        jconf.set(entry.getKey(), entry.getValue());
+    if (legacyMRInput != null) {
+      Configuration updatedConf = legacyMRInput.getConfigUpdates();
+      if (updatedConf != null) {
+        for (Entry<String, String> entry : updatedConf) {
+          jconf.set(entry.getKey(), entry.getValue());
+        }
       }
     }
 
@@ -158,8 +161,6 @@ public class MapRecordProcessor extends 
       if (mergeWorkList != null) {
         MapOperator mergeMapOp = null;
         for (MapWork mergeMapWork : mergeWorkList) {
-          processorContext.waitForAnyInputReady(Collections.singletonList((Input) (inputs
-              .get(mergeMapWork.getName()))));
           if (mergeMapWork.getVectorMode()) {
             mergeMapOp = new VectorMapOperator();
           } else {
@@ -235,11 +236,17 @@ public class MapRecordProcessor extends 
   }
 
   private void initializeMapRecordSources() throws Exception {
+
     int size = mergeMapOpList.size() + 1; // the +1 is for the main map operator itself
     sources = new MapRecordSource[size];
-    KeyValueReader reader = legacyMRInput.getReader();
     position = mapOp.getConf().getTag();
     sources[position] = new MapRecordSource();
+    KeyValueReader reader = null;
+    if (mainWorkMultiMRInput != null) {
+      reader = getKeyValueReader(mainWorkMultiMRInput.getKeyValueReaders(), mapOp);
+    } else {
+      reader = legacyMRInput.getReader();
+    }
     sources[position].init(jconf, mapOp, reader);
     for (MapOperator mapOp : mergeMapOpList) {
       int tag = mapOp.getConf().getTag();
@@ -248,13 +255,28 @@ public class MapRecordProcessor extends 
       MultiMRInput multiMRInput = multiMRInputMap.get(inputName);
       Collection<KeyValueReader> kvReaders = multiMRInput.getKeyValueReaders();
       l4j.debug("There are " + kvReaders.size() + " key-value readers for input " + inputName);
-      List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(kvReaders);
-      reader = new KeyValueInputMerger(kvReaderList);
+      reader = getKeyValueReader(kvReaders, mapOp);
       sources[tag].init(jconf, mapOp, reader);
     }
     ((TezContext) MapredContext.get()).setRecordSources(sources);
   }
 
+  @SuppressWarnings("deprecation")
+  private KeyValueReader getKeyValueReader(Collection<KeyValueReader> keyValueReaders,
+      MapOperator mapOp)
+      throws Exception {
+    List<KeyValueReader> kvReaderList = new ArrayList<KeyValueReader>(keyValueReaders);
+    // this sets up the map operator contexts correctly
+    mapOp.initializeContexts();
+    Deserializer deserializer = mapOp.getCurrentDeserializer();
+    KeyValueReader reader =
+        new KeyValueInputMerger(kvReaderList, deserializer,
+            new ObjectInspector[] { deserializer.getObjectInspector() }, mapOp
+                .getConf()
+                .getSortCols());
+    return reader;
+  }
+
   private DummyStoreOperator getJoinParentOp(Operator<? extends OperatorDesc> mergeMapOp) {
     for (Operator<? extends OperatorDesc> childOp : mergeMapOp.getChildOperators()) {
       if ((childOp.getChildOperators() == null) || (childOp.getChildOperators().isEmpty())) {
@@ -269,11 +291,7 @@ public class MapRecordProcessor extends 
   @Override
   void run() throws Exception {
 
-    while (sources[position].pushRecord()) {
-      if (isLogInfoEnabled) {
-        logProgress();
-      }
-    }
+    while (sources[position].pushRecord()) {}
   }
 
   @Override
@@ -305,10 +323,7 @@ public class MapRecordProcessor extends 
         }
       }
 
-      if (isLogInfoEnabled) {
-        logCloseInfo();
-      }
-      ReportStats rps = new ReportStats(reporter);
+      ReportStats rps = new ReportStats(reporter, jconf);
       mapOp.preorderMap(rps);
       return;
     } catch (Exception e) {
@@ -342,7 +357,17 @@ public class MapRecordProcessor extends 
         multiMRInputMap.put(inp.getKey(), (MultiMRInput) inp.getValue());
       }
     }
-    theMRInput.init();
+    if (theMRInput != null) {
+      theMRInput.init();
+    } else {
+      String alias = mapWork.getAliasToWork().keySet().iterator().next();
+      if (inputs.get(alias) instanceof MultiMRInput) {
+        mainWorkMultiMRInput = (MultiMRInput) inputs.get(alias);
+      } else {
+        throw new IOException("Unexpected input type found: "
+            + inputs.get(alias).getClass().getCanonicalName());
+      }
+    }
     return theMRInput;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java?rev=1635536&r1=1635535&r2=1635536&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordSource.java Thu Oct 30 16:22:33 2014
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.ql.exec.MapOperator;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.mapreduce.input.MRInput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
 /**
@@ -45,7 +43,7 @@ public class MapRecordSource implements 
   private final boolean grouped = false;
 
   void init(JobConf jconf, MapOperator mapOp, KeyValueReader reader) throws IOException {
-    execContext = new ExecMapperContext(jconf);
+    execContext = mapOp.getExecContext();
     this.mapOp = mapOp;
     this.reader = reader;
   }