You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/26 21:16:13 UTC

svn commit: r1476348 [9/29] - in /hive/branches/vectorization: ./ beeline/ beeline/src/java/org/apache/hive/beeline/ beeline/src/test/org/ beeline/src/test/org/apache/ beeline/src/test/org/apache/hive/ beeline/src/test/org/apache/hive/beeline/ beeline/...

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Fri Apr 26 19:14:49 2013
@@ -38,7 +38,7 @@ public class RCFileKeyBufferWrapper impl
 
   protected CompressionCodec codec;
 
-  protected RCFileKeyBufferWrapper() {
+  public RCFileKeyBufferWrapper() {
   }
 
   public static RCFileKeyBufferWrapper create(KeyBuffer currentKeyBufferObj) {
@@ -66,4 +66,48 @@ public class RCFileKeyBufferWrapper impl
     return keyBuffer;
   }
 
+  public void setKeyBuffer(KeyBuffer keyBuffer) {
+    this.keyBuffer = keyBuffer;
+  }
+
+  public int getRecordLength() {
+    return recordLength;
+  }
+
+  public void setRecordLength(int recordLength) {
+    this.recordLength = recordLength;
+  }
+
+  public int getKeyLength() {
+    return keyLength;
+  }
+
+  public void setKeyLength(int keyLength) {
+    this.keyLength = keyLength;
+  }
+
+  public int getCompressedKeyLength() {
+    return compressedKeyLength;
+  }
+
+  public void setCompressedKeyLength(int compressedKeyLength) {
+    this.compressedKeyLength = compressedKeyLength;
+  }
+
+  public Path getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(Path inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  public CompressionCodec getCodec() {
+    return codec;
+  }
+
+  public void setCodec(CompressionCodec codec) {
+    this.codec = codec;
+  }
+
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java Fri Apr 26 19:14:49 2013
@@ -48,4 +48,12 @@ public class RCFileValueBufferWrapper im
     return this.valueBuffer.compareTo(o.valueBuffer);
   }
 
+  public ValueBuffer getValueBuffer() {
+    return valueBuffer;
+  }
+
+  public void setValueBuffer(ValueBuffer valueBuffer) {
+    this.valueBuffer = valueBuffer;
+  }
+
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java Fri Apr 26 19:14:49 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -87,6 +88,11 @@ public class DefaultStorageHandler imple
   }
 
   @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    //do nothing by default
+  }
+
+  @Override
   public Configuration getConf() {
     return conf;
   }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Fri Apr 26 19:14:49 2013
@@ -76,6 +76,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Role;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.index.HiveIndexHandler;
@@ -1220,8 +1221,8 @@ public class Hive {
           org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
           SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
           /* Construct list bucketing location mappings from sub-directory name. */
-          Map<List<String>, String> skewedColValueLocationMaps = constructListBucketingLocationMap(
-              newPartPath, skewedInfo);
+          Map<SkewedValueList, String> skewedColValueLocationMaps =
+            constructListBucketingLocationMap(newPartPath, skewedInfo);
           /* Add list bucketing location mappings. */
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
@@ -1254,7 +1255,8 @@ public class Hive {
  * @throws IOException
  */
 private void walkDirTree(FileStatus fSta, FileSystem fSys,
-    Map<List<String>, String> skewedColValueLocationMaps, Path newPartPath, SkewedInfo skewedInfo)
+    Map<SkewedValueList, String> skewedColValueLocationMaps,
+    Path newPartPath, SkewedInfo skewedInfo)
     throws IOException {
   /* Base Case. It's leaf. */
   if (!fSta.isDir()) {
@@ -1280,7 +1282,7 @@ private void walkDirTree(FileStatus fSta
  * @param skewedInfo
  */
 private void constructOneLBLocationMap(FileStatus fSta,
-    Map<List<String>, String> skewedColValueLocationMaps,
+    Map<SkewedValueList, String> skewedColValueLocationMaps,
     Path newPartPath, SkewedInfo skewedInfo) {
   Path lbdPath = fSta.getPath().getParent();
   List<String> skewedValue = new ArrayList<String>();
@@ -1303,7 +1305,7 @@ private void constructOneLBLocationMap(F
   }
   if ((skewedValue.size() > 0) && (skewedValue.size() == skewedInfo.getSkewedColNames().size())
       && !skewedColValueLocationMaps.containsKey(skewedValue)) {
-    skewedColValueLocationMaps.put(skewedValue, lbdPath.toString());
+    skewedColValueLocationMaps.put(new SkewedValueList(skewedValue), lbdPath.toString());
   }
 }
 
@@ -1316,9 +1318,10 @@ private void constructOneLBLocationMap(F
    * @throws IOException
    * @throws FileNotFoundException
    */
-  private Map<List<String>, String> constructListBucketingLocationMap(Path newPartPath,
+  private Map<SkewedValueList, String> constructListBucketingLocationMap(Path newPartPath,
       SkewedInfo skewedInfo) throws IOException, FileNotFoundException {
-    Map<List<String>, String> skewedColValueLocationMaps = new HashMap<List<String>, String>();
+    Map<SkewedValueList, String> skewedColValueLocationMaps =
+      new HashMap<SkewedValueList, String>();
     FileSystem fSys = newPartPath.getFileSystem(conf);
     walkDirTree(fSys.getFileStatus(newPartPath), fSys, skewedColValueLocationMaps, newPartPath,
         skewedInfo);
@@ -1668,7 +1671,7 @@ private void constructOneLBLocationMap(F
     List<String> names = null;
     Table t = getTable(dbName, tblName);
 
-    List<String> pvals = getPvals(t.getPartCols(), partSpec);
+    List<String> pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec);
 
     try {
       names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
@@ -1710,19 +1713,6 @@ private void constructOneLBLocationMap(F
     }
   }
 
-  public static List<String> getPvals(List<FieldSchema> partCols,
-      Map<String, String> partSpec) {
-    List<String> pvals = new ArrayList<String>();
-    for (FieldSchema field : partCols) {
-      String val = partSpec.get(field.getName());
-      if (val == null) {
-        val = "";
-      }
-      pvals.add(val);
-    }
-    return pvals;
-  }
-
   /**
    * get all the partitions of the table that matches the given partial
    * specification. partition columns whose value is can be anything should be
@@ -1742,7 +1732,7 @@ private void constructOneLBLocationMap(F
           "partitioned table");
     }
 
-    List<String> partialPvals = getPvals(tbl.getPartCols(), partialPartSpec);
+    List<String> partialPvals = MetaStoreUtils.getPvals(tbl.getPartCols(), partialPartSpec);
 
     List<org.apache.hadoop.hive.metastore.api.Partition> partitions = null;
     try {
@@ -2248,6 +2238,18 @@ private void constructOneLBLocationMap(F
     }
   }
 
+  public void exchangeTablePartitions(Map<String, String> partitionSpecs,
+      String sourceDb, String sourceTable, String destDb,
+      String destinationTableName) throws HiveException {
+    try {
+      getMSC().exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb,
+        destinationTableName);
+    } catch (Exception ex) {
+      LOG.error(StringUtils.stringifyException(ex));
+      throw new HiveException(ex);
+    }
+  }
+
   /**
    * Creates a metastore client. Currently it creates only JDBC based client as
    * File based store support is removed

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java Fri Apr 26 19:14:49 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 
 /**
@@ -133,4 +134,12 @@ public interface HiveStorageHandler exte
   public void configureTableJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties);
+
+  /**
+   * Called just before submitting MapReduce job.
+   *
+   * @param tableDesc descriptor for the table being accessed
+   * @param JobConf jobConf for MapReduce job
+   */
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Partition.java Fri Apr 26 19:14:49 2013
@@ -40,6 +40,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
@@ -647,18 +648,18 @@ public class Partition implements Serial
 
   public void setSkewedValueLocationMap(List<String> valList, String dirName)
       throws HiveException {
-    Map<List<String>, String> mappings = tPartition.getSd().getSkewedInfo()
+    Map<SkewedValueList, String> mappings = tPartition.getSd().getSkewedInfo()
         .getSkewedColValueLocationMaps();
     if (null == mappings) {
-      mappings = new HashMap<List<String>, String>();
+      mappings = new HashMap<SkewedValueList, String>();
       tPartition.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
     }
 
     // Add or update new mapping
-    mappings.put(valList, dirName);
+    mappings.put(new SkewedValueList(valList), dirName);
   }
 
-  public Map<List<String>, String> getSkewedColValueLocationMaps() {
+  public Map<SkewedValueList, String> getSkewedColValueLocationMaps() {
     return tPartition.getSd().getSkewedInfo().getSkewedColValueLocationMaps();
   }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Fri Apr 26 19:14:49 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
@@ -144,7 +145,7 @@ public class Table implements Serializab
       SkewedInfo skewInfo = new SkewedInfo();
       skewInfo.setSkewedColNames(new ArrayList<String>());
       skewInfo.setSkewedColValues(new ArrayList<List<String>>());
-      skewInfo.setSkewedColValueLocationMaps(new HashMap<List<String>, String>());
+      skewInfo.setSkewedColValueLocationMaps(new HashMap<SkewedValueList, String>());
       sd.setSkewedInfo(skewInfo);
     }
 
@@ -518,20 +519,20 @@ public class Table implements Serializab
 
   public void setSkewedValueLocationMap(List<String> valList, String dirName)
       throws HiveException {
-    Map<List<String>, String> mappings = tTable.getSd().getSkewedInfo()
+    Map<SkewedValueList, String> mappings = tTable.getSd().getSkewedInfo()
         .getSkewedColValueLocationMaps();
     if (null == mappings) {
-      mappings = new HashMap<List<String>, String>();
+      mappings = new HashMap<SkewedValueList, String>();
       tTable.getSd().getSkewedInfo().setSkewedColValueLocationMaps(mappings);
     }
 
     // Add or update new mapping
-    mappings.put(valList, dirName);
+    mappings.put(new SkewedValueList(valList), dirName);
   }
 
-  public Map<List<String>,String> getSkewedColValueLocationMaps() {
+  public Map<SkewedValueList,String> getSkewedColValueLocationMaps() {
     return (tTable.getSd().getSkewedInfo() != null) ? tTable.getSd().getSkewedInfo()
-        .getSkewedColValueLocationMaps() : new HashMap<List<String>, String>();
+        .getSkewedColValueLocationMaps() : new HashMap<SkewedValueList, String>();
   }
 
   public void setSkewedColValues(List<List<String>> skewedValues) throws HiveException {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/MetaDataFormatUtils.java Fri Apr 26 19:14:49 2013
@@ -31,6 +31,7 @@ import org.apache.commons.lang.StringEsc
 import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Index;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.ql.index.HiveIndex;
 import org.apache.hadoop.hive.ql.index.HiveIndex.IndexType;
@@ -62,16 +63,22 @@ public final class MetaDataFormatUtils {
     columnInformation.append(LINE_DELIM);
   }
 
-  public static String getAllColumnsInformation(List<FieldSchema> cols) {
+  public static String getAllColumnsInformation(List<FieldSchema> cols,
+      boolean printHeader) {
     StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
-    formatColumnsHeader(columnInformation);
+    if(printHeader){
+      formatColumnsHeader(columnInformation);
+    }
     formatAllFields(columnInformation, cols);
     return columnInformation.toString();
   }
 
-  public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols) {
+  public static String getAllColumnsInformation(List<FieldSchema> cols, List<FieldSchema> partCols,
+      boolean printHeader) {
     StringBuilder columnInformation = new StringBuilder(DEFAULT_STRINGBUILDER_SIZE);
-    formatColumnsHeader(columnInformation);
+    if(printHeader){
+      formatColumnsHeader(columnInformation);
+    }
     formatAllFields(columnInformation, cols);
 
     if ((partCols != null) && (!partCols.isEmpty())) {
@@ -193,7 +200,7 @@ public final class MetaDataFormatUtils {
         formatOutput("Skewed Values:", skewedColValues.toString(), tableInfo);
       }
 
-      Map<List<String>, String> skewedColMap = storageDesc.getSkewedInfo()
+      Map<SkewedValueList, String> skewedColMap = storageDesc.getSkewedInfo()
           .getSkewedColValueLocationMaps();
       if ((skewedColMap!=null) && (skewedColMap.size() > 0)) {
         formatOutput("Skewed Value to Path:", skewedColMap.toString(),
@@ -201,9 +208,8 @@ public final class MetaDataFormatUtils {
         Map<List<String>, String> truncatedSkewedColMap = new HashMap<List<String>, String>();
         // walk through existing map to truncate path so that test won't mask it
         // then we can verify location is right
-        Set<Entry<List<String>, String>> entries = skewedColMap.entrySet();
-        for (Entry<List<String>, String> entry : entries) {
-          truncatedSkewedColMap.put(entry.getKey(),
+        for (Entry<SkewedValueList, String> entry : skewedColMap.entrySet()) {
+          truncatedSkewedColMap.put(entry.getKey().getSkewedValueList(),
               PlanUtils.removePrefixFromWarehouseConfig(entry.getValue()));
         }
         formatOutput("Skewed Value to Truncated Path:",

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/metadata/formatting/TextMetaDataFormatter.java Fri Apr 26 19:14:49 2013
@@ -19,13 +19,14 @@
 package org.apache.hadoop.hive.ql.metadata.formatting;
 
 import java.io.DataOutputStream;
-import java.io.OutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileStatus;
@@ -143,10 +144,11 @@ public class TextMetaDataFormatter imple
                   MetaDataPrettyFormatUtils.getAllColumnsInformation(
                       cols, partCols, prettyOutputNumCols)
                 :
-                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols)
+                  MetaDataFormatUtils.getAllColumnsInformation(cols, partCols, isFormatted)
               );
           } else {
-            outStream.writeBytes(MetaDataFormatUtils.getAllColumnsInformation(cols));
+            outStream.writeBytes(
+                MetaDataFormatUtils.getAllColumnsInformation(cols, isFormatted));
           }
 
           if (tableName.equals(colPath)) {
@@ -455,11 +457,13 @@ public class TextMetaDataFormatter imple
         try {
             outStream.writeBytes(database);
             outStream.write(separator);
-            if (comment != null)
-                outStream.writeBytes(comment);
+            if (comment != null) {
+              outStream.writeBytes(comment);
+            }
             outStream.write(separator);
-            if (location != null)
-                outStream.writeBytes(location);
+            if (location != null) {
+              outStream.writeBytes(location);
+            }
             outStream.write(separator);
             if (params != null && !params.isEmpty()) {
                 outStream.writeBytes(params.toString());

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Fri Apr 26 19:14:49 2013
@@ -390,11 +390,10 @@ abstract public class AbstractSMBJoinPro
   // Can the join operator be converted to a sort-merge join operator ?
   // It is already verified that the join can be converted to a bucket map join
   protected boolean checkConvertJoinToSMBJoin(
-    JoinOperator joinOperator,
-    SortBucketJoinProcCtx smbJoinContext,
-    ParseContext pGraphContext) throws SemanticException {
+      JoinOperator joinOperator,
+      SortBucketJoinProcCtx smbJoinContext,
+      ParseContext pGraphContext) throws SemanticException {
 
-    boolean tableEligibleForBucketedSortMergeJoin = true;
     QBJoinTree joinCtx = pGraphContext.getJoinContext().get(joinOperator);
 
     if (joinCtx == null) {
@@ -409,14 +408,15 @@ abstract public class AbstractSMBJoinPro
     List<Order> sortColumnsFirstTable = new ArrayList<Order>();
 
     for (int pos = 0; pos < srcs.length; pos++) {
-      tableEligibleForBucketedSortMergeJoin = tableEligibleForBucketedSortMergeJoin &&
-        isEligibleForBucketSortMergeJoin(smbJoinContext,
-                      pGraphContext,
-                      smbJoinContext.getKeyExprMap().get((byte)pos),
-                      joinCtx,
-                      srcs,
-                      pos,
-                      sortColumnsFirstTable);
+      if (!isEligibleForBucketSortMergeJoin(smbJoinContext,
+          pGraphContext,
+          smbJoinContext.getKeyExprMap().get((byte) pos),
+          joinCtx,
+          srcs,
+          pos,
+          sortColumnsFirstTable)) {
+        return false;
+      }
     }
 
     smbJoinContext.setSrcs(srcs);
@@ -472,6 +472,10 @@ abstract public class AbstractSMBJoinPro
       (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
     int bigTablePosition =
       bigTableMatcher.getBigTablePosition(pGraphContext, joinOp);
+    if (bigTablePosition < 0) {
+      // contains aliases from sub-query
+      return false;
+    }
     context.setBigTablePosition(bigTablePosition);
     String joinAlias =
       bigTablePosition == 0 ?
@@ -489,9 +493,12 @@ abstract public class AbstractSMBJoinPro
     }
 
     context.setKeyExprMap(keyExprMap);
-    String[] srcs = joinCtx.getBaseSrc();
-    for (int srcPos = 0; srcPos < srcs.length; srcPos++) {
-      srcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), srcs[srcPos]);
+    // Make a deep copy of the aliases so that they are not changed in the context
+    String[] joinSrcs = joinCtx.getBaseSrc();
+    String[] srcs = new String[joinSrcs.length];
+    for (int srcPos = 0; srcPos < joinSrcs.length; srcPos++) {
+      joinSrcs[srcPos] = QB.getAppendedAliasFromId(joinCtx.getId(), joinSrcs[srcPos]);
+      srcs[srcPos] = new String(joinSrcs[srcPos]);
     }
 
     // Given a candidate map-join, can this join be converted.

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -57,6 +57,9 @@ public class AvgPartitionSizeBasedBigTab
       getListTopOps(joinOp, topOps);
       int currentPos = 0;
       for (TableScanOperator topOp : topOps) {
+        if (topOp == null) {
+          return -1;
+        }
         int numPartitions = 1; // in case the sizes match, preference is
                                // given to the table with fewer partitions
         Table table = parseCtx.getTopToTable().get(topOp);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 26 19:14:49 2013
@@ -249,6 +249,7 @@ public class BucketingSortingReduceSinkO
       fsOp.getConf().setMultiFileSpray(false);
       fsOp.getConf().setTotalFiles(1);
       fsOp.getConf().setNumFiles(1);
+      fsOp.getConf().setRemovedReduceSinkBucketSort(true);
       tsOp.setUseBucketizedHiveInputFormat(true);
     }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Apr 26 19:14:49 2013
@@ -204,7 +204,9 @@ public class GenMRFileSink1 implements N
           (MapredWork) currTask.getWork(), false, ctx);
       }
 
-      if (!rootTasks.contains(currTask)) {
+      if (!rootTasks.contains(currTask)
+          && (currTask.getParentTasks() == null
+              || currTask.getParentTasks().isEmpty())) {
         rootTasks.add(currTask);
       }
     }
@@ -721,7 +723,9 @@ public class GenMRFileSink1 implements N
               (MapredWork) currTask.getWork(), false, ctx);
         }
         opTaskMap.put(null, currTask);
-        if (!rootTasks.contains(currTask)) {
+        if (!rootTasks.contains(currTask)
+            && (currTask.getParentTasks() == null
+                || currTask.getParentTasks().isEmpty())) {
           rootTasks.add(currTask);
         }
       } else {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Apr 26 19:14:49 2013
@@ -259,12 +259,17 @@ public class GenMRUnion1 implements Node
     // Copy into the current union task plan if
     if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
       processSubQueryUnionMerge(ctx, uCtxTask, union, stack);
+      if (ctx.getRootTasks().contains(currTask)) {
+        ctx.getRootTasks().remove(currTask);
+      }
     }
     // If it a map-reduce job, create a temporary file
     else {
       // is the current task a root task
       if (shouldBeRootTask(currTask)
-          && (!ctx.getRootTasks().contains(currTask))) {
+          && !ctx.getRootTasks().contains(currTask)
+          && (currTask.getParentTasks() == null
+              || currTask.getParentTasks().isEmpty())) {
         ctx.getRootTasks().add(currTask);
       }
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Apr 26 19:14:49 2013
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -106,7 +108,9 @@ public final class GenMapRedUtils {
 
     List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
 
-    if (!rootTasks.contains(currTask)) {
+    if (!rootTasks.contains(currTask)
+        && (currTask.getParentTasks() == null
+            || currTask.getParentTasks().isEmpty())) {
       rootTasks.add(currTask);
     }
     if (reducer.getClass() == JoinOperator.class) {
@@ -759,6 +763,41 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Set the key and value description for all the tasks rooted at the given
+   * task. Loops over all the tasks recursively.
+   *
+   * @param task
+   */
+  public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task) {
+
+    if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+          .getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setKeyAndValueDescForTaskTree(tsk);
+      }
+    } else if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      work.deriveExplainAttributes();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = work
+          .getAliasToWork();
+      if (opMap != null && !opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setKeyAndValueDesc(work, op);
+        }
+      }
+    }
+
+    if (task.getChildTasks() == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+      setKeyAndValueDescForTaskTree(childTask);
+    }
+  }
+
+  /**
    * create a new plan and return.
    *
    * @return the new plan

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Apr 26 19:14:49 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -107,7 +109,7 @@ public class GroupByOptimizer implements
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
     // Create a list of topop nodes
-    ArrayList<Node> topNodes = new ArrayList<Node>();
+    List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
 
@@ -174,15 +176,83 @@ public class GroupByOptimizer implements
       GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
       boolean useMapperSort =
           HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+      GroupByDesc groupByOpDesc = groupByOp.getConf();
+
+      boolean removeReduceSink = false;
+      boolean optimizeDistincts = false;
+      boolean setBucketGroup = false;
 
       // Dont remove the operator for distincts
-      if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+      if (useMapperSort &&
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
-        convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+        if (!groupByOpDesc.isDistinct()) {
+          removeReduceSink = true;
+        }
+        else if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+          // Optimize the query: select count(distinct keys) from T, where
+          // T is bucketized and sorted by T
+          // Partial aggregation can be done by the mappers in this scenario
+
+          List<ExprNodeDesc> keys =
+              ((GroupByOperator)
+              (groupByOp.getChildOperators().get(0).getChildOperators().get(0)))
+                  .getConf().getKeys();
+          if ((keys == null) || (keys.isEmpty())) {
+            optimizeDistincts = true;
+          }
+        }
       }
-      else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+
+      if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
-        groupByOp.getConf().setBucketGroup(true);
+        setBucketGroup = true;
+      }
+
+      if (removeReduceSink) {
+        convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+      }
+      else if (optimizeDistincts) {
+        // In test mode, dont change the query plan. However, setup a query property
+        pGraphContext.getQueryProperties().setHasMapGroupBy(true);
+        if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
+          return;
+        }
+        ReduceSinkOperator reduceSinkOp =
+            (ReduceSinkOperator)groupByOp.getChildOperators().get(0);
+        GroupByDesc childGroupByDesc =
+            ((GroupByOperator)
+            (reduceSinkOp.getChildOperators().get(0))).getConf();
+
+        for (int pos = 0; pos < childGroupByDesc.getAggregators().size(); pos++) {
+          AggregationDesc aggr = childGroupByDesc.getAggregators().get(pos);
+          // Partial aggregation is not done for distincts on the mapper
+          // However, if the data is bucketed/sorted on the distinct key, partial aggregation
+          // can be performed on the mapper.
+          if (aggr.getDistinct()) {
+            ArrayList<ExprNodeDesc> parameters = new ArrayList<ExprNodeDesc>();
+            ExprNodeDesc param = aggr.getParameters().get(0);
+            assert param instanceof ExprNodeColumnDesc;
+            ExprNodeColumnDesc paramC = (ExprNodeColumnDesc) param;
+            paramC.setIsPartitionColOrVirtualCol(false);
+            paramC.setColumn("VALUE._col" + pos);
+            parameters.add(paramC);
+            aggr.setParameters(parameters);
+            aggr.setDistinct(false);
+            aggr.setMode(Mode.FINAL);
+          }
+        }
+        // Partial aggregation is performed on the mapper, no distinct processing at the reducer
+        childGroupByDesc.setDistinct(false);
+        groupByOpDesc.setDontResetAggrsDistinct(true);
+        groupByOpDesc.setBucketGroup(true);
+        groupByOp.setUseBucketizedHiveInputFormat(true);
+        // no distinct processing at the reducer
+        // A query like 'select count(distinct key) from T' is transformed into
+        // 'select count(key) from T' as far as the reducer is concerned.
+        reduceSinkOp.getConf().setDistinctColumnIndices(new ArrayList<List<Integer>>());
+      }
+      else if (setBucketGroup) {
+        groupByOpDesc.setBucketGroup(true);
       }
     }
 
@@ -339,8 +409,8 @@ public class GroupByOptimizer implements
 
         GroupByOptimizerSortMatch currentMatch =
             notDeniedPartns.isEmpty() ? GroupByOptimizerSortMatch.NO_MATCH :
-              notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
-                GroupByOptimizerSortMatch.COMPLETE_MATCH;
+                notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
+                    GroupByOptimizerSortMatch.COMPLETE_MATCH;
         for (Partition part : notDeniedPartns) {
           List<String> sortCols = part.getSortColNames();
           List<String> bucketCols = part.getBucketCols();
@@ -440,8 +510,9 @@ public class GroupByOptimizer implements
       case NO_MATCH:
         return GroupByOptimizerSortMatch.NO_MATCH;
       case COMPLETE_MATCH:
-        return ((bucketCols != null) && !bucketCols.isEmpty() && sortCols.containsAll(bucketCols)) ?
-          GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
+        return ((bucketCols != null) && !bucketCols.isEmpty() &&
+            sortCols.containsAll(bucketCols)) ?
+            GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
       case PREFIX_COL1_MATCH:
         return GroupByOptimizerSortMatch.NO_MATCH;
       case PREFIX_COL2_MATCH:

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinFactory.java Fri Apr 26 19:14:49 2013
@@ -163,8 +163,11 @@ public final class MapJoinFactory {
       opTaskMap.put(op, currTask);
 
       List<Task<? extends Serializable>> rootTasks = opProcCtx.getRootTasks();
-      assert (!rootTasks.contains(currTask));
-      rootTasks.add(currTask);
+      if(!rootTasks.contains(currTask)
+         && (currTask.getParentTasks() == null
+             || currTask.getParentTasks().isEmpty())) {
+        rootTasks.add(currTask);
+      }
 
       assert currTopOp != null;
       opProcCtx.getSeenOps().add(currTopOp);

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Fri Apr 26 19:14:49 2013
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.Op
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
+import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.ScriptOperator;
 import org.apache.hadoop.hive.ql.exec.SelectOperator;
 import org.apache.hadoop.hive.ql.exec.UnionOperator;
@@ -74,6 +75,7 @@ import org.apache.hadoop.hive.ql.plan.Op
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
+import org.apache.hadoop.hive.ql.plan.SMBJoinDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -88,6 +90,10 @@ import org.apache.hadoop.hive.serde2.typ
 public class MapJoinProcessor implements Transform {
 
   private static final Log LOG = LogFactory.getLog(MapJoinProcessor.class.getName());
+  // mapjoin table descriptor contains a key descriptor which needs the field schema
+  // (column type + column name). The column name is not really used anywhere, but it
+  // needs to be passed. Use the string defined below for that.
+  private static final String MAPJOINKEY_FIELDPREFIX = "mapjoinkey";
 
   private ParseContext pGraphContext;
 
@@ -107,9 +113,11 @@ public class MapJoinProcessor implements
   }
 
   /**
-   * Generate the MapRed Local Work
+   * Generate the MapRed Local Work for the given map-join operator
+   *
    * @param newWork
    * @param mapJoinOp
+   *          map-join operator for which local work needs to be generated.
    * @param bigTablePos
    * @return
    * @throws SemanticException
@@ -219,15 +227,31 @@ public class MapJoinProcessor implements
     return bigTableAlias;
   }
 
+  /**
+   * Convert the join to a map-join and also generate any local work needed.
+   *
+   * @param newWork MapredWork in which the conversion is to happen
+   * @param op
+   *          The join operator that needs to be converted to map-join
+   * @param bigTablePos
+   * @return the alias to the big table
+   * @throws SemanticException
+   */
   public static String genMapJoinOpAndLocalWork(MapredWork newWork, JoinOperator op, int mapJoinPos)
-    throws SemanticException {
-    try {
-      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
+      throws SemanticException {
+    LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap =
         newWork.getOpParseCtxMap();
-      QBJoinTree newJoinTree = newWork.getJoinTree();
-      // generate the map join operator; already checked the map join
-      MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
-          newJoinTree, mapJoinPos, true, false);
+    QBJoinTree newJoinTree = newWork.getJoinTree();
+    // generate the map join operator; already checked the map join
+    MapJoinOperator newMapJoinOp = MapJoinProcessor.convertMapJoin(opParseCtxMap, op,
+        newJoinTree, mapJoinPos, true, false);
+    return genLocalWorkForMapJoin(newWork, newMapJoinOp, mapJoinPos);
+  }
+
+  public static String genLocalWorkForMapJoin(MapredWork newWork, MapJoinOperator newMapJoinOp,
+      int mapJoinPos)
+      throws SemanticException {
+    try {
       // generate the local work and return the big table alias
       String bigTableAlias = MapJoinProcessor
           .genMapJoinLocalWork(newWork, newMapJoinOp, mapJoinPos);
@@ -422,7 +446,7 @@ public class MapJoinProcessor implements
     }
 
     TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
-        .getFieldSchemasFromColumnList(keyCols, "mapjoinkey"));
+        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
 
     List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
     List<TableDesc> valueFiltedTableDescs = new ArrayList<TableDesc>();
@@ -501,6 +525,65 @@ public class MapJoinProcessor implements
     return mapJoinOp;
   }
 
+  /**
+   * convert a sortmerge join to a a map-side join.
+   *
+   * @param opParseCtxMap
+   * @param smbJoinOp
+   *          join operator
+   * @param joinTree
+   *          qb join tree
+   * @param bigTablePos
+   *          position of the source to be read as part of map-reduce framework. All other sources
+   *          are cached in memory
+   * @param noCheckOuterJoin
+   */
+  public static MapJoinOperator convertSMBJoinToMapJoin(
+    Map<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+    SMBMapJoinOperator smbJoinOp, QBJoinTree joinTree, int bigTablePos, boolean noCheckOuterJoin)
+    throws SemanticException {
+    // Create a new map join operator
+    SMBJoinDesc smbJoinDesc = smbJoinOp.getConf();
+    List<ExprNodeDesc> keyCols = smbJoinDesc.getKeys().get(Byte.valueOf((byte) 0));
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(PlanUtils
+        .getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    MapJoinDesc mapJoinDesc = new MapJoinDesc(smbJoinDesc.getKeys(),
+        keyTableDesc, smbJoinDesc.getExprs(),
+        smbJoinDesc.getValueTblDescs(), smbJoinDesc.getValueTblDescs(),
+        smbJoinDesc.getOutputColumnNames(),
+        bigTablePos, smbJoinDesc.getConds(),
+        smbJoinDesc.getFilters(), smbJoinDesc.isNoOuterJoin(), smbJoinDesc.getDumpFilePrefix());
+
+    RowResolver joinRS = opParseCtxMap.get(smbJoinOp).getRowResolver();
+    // The mapjoin has the same schema as the join operator
+    MapJoinOperator mapJoinOp = (MapJoinOperator) OperatorFactory.getAndMakeChild(
+        mapJoinDesc, joinRS.getRowSchema(),
+        new ArrayList<Operator<? extends OperatorDesc>>());
+
+    OpParseContext ctx = new OpParseContext(joinRS);
+    opParseCtxMap.put(mapJoinOp, ctx);
+
+    // change the children of the original join operator to point to the map
+    // join operator
+    List<Operator<? extends OperatorDesc>> childOps = smbJoinOp.getChildOperators();
+    for (Operator<? extends OperatorDesc> childOp : childOps) {
+      childOp.replaceParent(smbJoinOp, mapJoinOp);
+    }
+    mapJoinOp.setChildOperators(childOps);
+    smbJoinOp.setChildOperators(null);
+
+    // change the parent of the original SMBjoin operator to point to the map
+    // join operator
+    List<Operator<? extends OperatorDesc>> parentOps = smbJoinOp.getParentOperators();
+    for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+      parentOp.replaceChild(smbJoinOp, mapJoinOp);
+    }
+    mapJoinOp.setParentOperators(parentOps);
+    smbJoinOp.setParentOperators(null);
+
+    return mapJoinOp;
+  }
+
   public MapJoinOperator generateMapJoinOperator(ParseContext pctx, JoinOperator op,
       QBJoinTree joinTree, int mapJoinPos) throws SemanticException {
     HiveConf hiveConf = pctx.getConf();

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -23,6 +23,7 @@ import java.util.List;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.CommonJoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.TableScanOperator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
@@ -43,9 +44,10 @@ public abstract class SizeBasedBigTableS
 
     for (Operator<? extends OperatorDesc> parentOp : op.getParentOperators()) {
       if (parentOp instanceof TableScanOperator) {
-        topOps.add((TableScanOperator)parentOp);
-      }
-      else {
+        topOps.add((TableScanOperator) parentOp);
+      } else if (parentOp instanceof CommonJoinOperator) {
+        topOps.add(null);
+      } else {
         getListTopOps(parentOp, topOps);
       }
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/TableSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 19:14:49 2013
@@ -49,6 +49,9 @@ implements BigTableSelectorForAutoSMJ {
       getListTopOps(joinOp, topOps);
       int currentPos = 0;
       for (TableScanOperator topOp : topOps) {
+        if (topOp == null) {
+          return -1;
+        }
         Table table = parseCtx.getTopToTable().get(topOp);
         long currentSize = 0;
 

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/listbucketingpruner/ListBucketingPruner.java Fri Apr 26 19:14:49 2013
@@ -27,6 +27,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.optimizer.PrunerUtils;
@@ -290,12 +291,13 @@ public class ListBucketingPruner impleme
       List<List<String>> uniqSkewedValues) throws SemanticException {
     // For each entry in dynamic-multi-dimension collection.
     List<String> skewedCols = part.getSkewedColNames(); // Retrieve skewed column.
-    Map<List<String>, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
+    Map<SkewedValueList, String> mappings = part.getSkewedColValueLocationMaps(); // Retrieve skewed
                                                                                // map.
     assert ListBucketingPrunerUtils.isListBucketingPart(part) : part.getName()
         + " skewed metadata is corrupted. No skewed column and/or location mappings information.";
     List<List<String>> skewedValues = part.getSkewedColValues();
     List<Boolean> nonSkewedValueMatchResult = new ArrayList<Boolean>();
+    SkewedValueList skewedValueList = new SkewedValueList();
     for (List<String> cell : collections) {
       // Walk through the tree to decide value.
       // Example: skewed column: C1, C2 ;
@@ -309,8 +311,9 @@ public class ListBucketingPruner impleme
           /* It's valid case if a partition: */
           /* 1. is defined with skewed columns and skewed values in metadata */
           /* 2. doesn't have all skewed values within its data */
-          if (mappings.get(cell) != null) {
-            selectedPaths.add(new Path(mappings.get(cell)));
+          skewedValueList.setSkewedValueList(cell);
+          if (mappings.get(skewedValueList) != null) {
+            selectedPaths.add(new Path(mappings.get(skewedValueList)));
           }
         }
       } else {

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/CommonJoinResolver.java Fri Apr 26 19:14:49 2013
@@ -17,46 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.optimizer.physical;
 
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
-import java.util.Stack;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.hive.common.ObjectPair;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.exec.ConditionalTask;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
-import org.apache.hadoop.hive.ql.exec.JoinOperator;
-import org.apache.hadoop.hive.ql.exec.MapRedTask;
-import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.Task;
-import org.apache.hadoop.hive.ql.exec.TaskFactory;
-import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.TaskGraphWalker;
-import org.apache.hadoop.hive.ql.lib.TaskGraphWalker.TaskGraphWalkerContext;
-import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor;
-import org.apache.hadoop.hive.ql.parse.ParseContext;
-import org.apache.hadoop.hive.ql.parse.QBJoinTree;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin;
-import org.apache.hadoop.hive.ql.plan.ConditionalResolverCommonJoin.ConditionalResolverCommonJoinCtx;
-import org.apache.hadoop.hive.ql.plan.ConditionalWork;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 
 /*
  * Convert tasks involving JOIN into MAPJOIN.
@@ -105,499 +72,11 @@ public class CommonJoinResolver implemen
     TaskGraphWalker ogw = new TaskGraphWalker(disp);
 
     // get all the tasks nodes from root task
-    ArrayList<Node> topNodes = new ArrayList<Node>();
+    List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.rootTasks);
 
     // begin to walk through the task tree.
     ogw.startWalking(topNodes, null);
     return pctx;
   }
-
-  /**
-   * Iterator each tasks. If this task has a local work,create a new task for this local work, named
-   * MapredLocalTask. then make this new generated task depends on current task's parent task, and
-   * make current task depends on this new generated task
-   */
-  class CommonJoinTaskDispatcher implements Dispatcher {
-
-    private final PhysicalContext physicalContext;
-
-    public CommonJoinTaskDispatcher(PhysicalContext context) {
-      super();
-      physicalContext = context;
-    }
-
-    // Get the position of the big table for this join operator and the given alias
-    private int getPosition(MapredWork work, Operator<? extends OperatorDesc> joinOp,
-        String alias) {
-      Operator<? extends OperatorDesc> parentOp = work.getAliasToWork().get(alias);
-
-      // reduceSinkOperator's child is null, but joinOperator's parents is reduceSink
-      while ((parentOp.getChildOperators() != null) &&
-          (!parentOp.getChildOperators().isEmpty())) {
-        parentOp = parentOp.getChildOperators().get(0);
-      }
-
-      return joinOp.getParentOperators().indexOf(parentOp);
-    }
-
-    /*
-     * A task and its child task has been converted from join to mapjoin.
-     * See if the two tasks can be merged.
-     */
-    private void mergeMapJoinTaskWithChildMapJoinTask(MapRedTask task) {
-      MapRedTask childTask = (MapRedTask)task.getChildTasks().get(0);
-      MapredWork work = task.getWork();
-      MapredLocalWork localWork = work.getMapLocalWork();
-      MapredWork childWork = childTask.getWork();
-      MapredLocalWork childLocalWork = childWork.getMapLocalWork();
-
-      // Can this be merged
-      Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
-      if (aliasToWork.size() > 1) {
-        return;
-      }
-
-      Operator<? extends OperatorDesc> op = aliasToWork.values().iterator().next();
-      while (op.getChildOperators() != null) {
-        // Dont perform this optimization for multi-table inserts
-        if (op.getChildOperators().size() > 1) {
-          return;
-        }
-        op = op.getChildOperators().get(0);
-      }
-
-      if (!(op instanceof FileSinkOperator)) {
-        return;
-      }
-
-      FileSinkOperator fop = (FileSinkOperator)op;
-      String workDir = fop.getConf().getDirName();
-
-      Map<String, ArrayList<String>> childPathToAliases = childWork.getPathToAliases();
-      if (childPathToAliases.size() > 1) {
-        return;
-      }
-
-      // The filesink writes to a different directory
-      if (!childPathToAliases.keySet().iterator().next().equals(workDir)) {
-        return;
-      }
-
-      // Either of them should not be bucketed
-      if ((localWork.getBucketMapjoinContext() != null) ||
-          (childLocalWork.getBucketMapjoinContext() != null)) {
-        return;
-      }
-
-      // Merge the trees
-      if (childWork.getAliasToWork().size() > 1) {
-        return;
-      }
-
-      Operator<? extends Serializable> childAliasOp =
-          childWork.getAliasToWork().values().iterator().next();
-      if (fop.getParentOperators().size() > 1) {
-        return;
-      }
-
-      // Merge the 2 trees - remove the FileSinkOperator from the first tree pass it to the
-      // top of the second
-      Operator<? extends Serializable> parentFOp = fop.getParentOperators().get(0);
-      parentFOp.getChildOperators().remove(fop);
-      parentFOp.getChildOperators().add(childAliasOp);
-      List<Operator<? extends OperatorDesc>> parentOps =
-          new ArrayList<Operator<? extends OperatorDesc>>();
-      parentOps.add(parentFOp);
-      childAliasOp.setParentOperators(parentOps);
-
-      work.getAliasToPartnInfo().putAll(childWork.getAliasToPartnInfo());
-      for (Map.Entry<String, PartitionDesc> childWorkEntry :
-        childWork.getPathToPartitionInfo().entrySet()) {
-        if (childWork.getAliasToPartnInfo().containsValue(childWorkEntry.getKey())) {
-          work.getPathToPartitionInfo().put(childWorkEntry.getKey(), childWorkEntry.getValue());
-        }
-      }
-
-      localWork.getAliasToFetchWork().putAll(childLocalWork.getAliasToFetchWork());
-      localWork.getAliasToWork().putAll(childLocalWork.getAliasToWork());
-
-      // remove the child task
-      List<Task<? extends Serializable>> oldChildTasks = childTask.getChildTasks();
-      task.setChildTasks(oldChildTasks);
-      if (oldChildTasks != null) {
-        for (Task<? extends Serializable> oldChildTask : oldChildTasks) {
-          oldChildTask.getParentTasks().remove(childTask);
-          oldChildTask.getParentTasks().add(task);
-        }
-      }
-    }
-
-    // create map join task and set big table as bigTablePosition
-    private ObjectPair<MapRedTask, String> convertTaskToMapJoinTask(MapredWork newWork,
-        int bigTablePosition) throws SemanticException {
-      // create a mapred task for this work
-      MapRedTask newTask = (MapRedTask) TaskFactory.get(newWork, physicalContext
-          .getParseContext().getConf());
-      JoinOperator newJoinOp = getJoinOp(newTask);
-
-      // optimize this newWork and assume big table position is i
-      String bigTableAlias =
-          MapJoinProcessor.genMapJoinOpAndLocalWork(newWork, newJoinOp, bigTablePosition);
-      return new ObjectPair<MapRedTask, String>(newTask, bigTableAlias);
-    }
-
-    private Task<? extends Serializable> processCurrentTask(MapRedTask currTask,
-        ConditionalTask conditionalTask, Context context)
-        throws SemanticException {
-
-      // whether it contains common join op; if contains, return this common join op
-      JoinOperator joinOp = getJoinOp(currTask);
-      if (joinOp == null || joinOp.getConf().isFixedAsSorted()) {
-        return null;
-      }
-      currTask.setTaskTag(Task.COMMON_JOIN);
-
-      MapredWork currWork = currTask.getWork();
-
-      // create conditional work list and task list
-      List<Serializable> listWorks = new ArrayList<Serializable>();
-      List<Task<? extends Serializable>> listTasks = new ArrayList<Task<? extends Serializable>>();
-
-      // create alias to task mapping and alias to input file mapping for resolver
-      HashMap<String, Task<? extends Serializable>> aliasToTask = new HashMap<String, Task<? extends Serializable>>();
-      HashMap<String, ArrayList<String>> pathToAliases = currWork.getPathToAliases();
-      Map<String, Operator<? extends OperatorDesc>> aliasToWork = currWork.getAliasToWork();
-
-      // get parseCtx for this Join Operator
-      ParseContext parseCtx = physicalContext.getParseContext();
-      QBJoinTree joinTree = parseCtx.getJoinContext().get(joinOp);
-
-      // start to generate multiple map join tasks
-      JoinDesc joinDesc = joinOp.getConf();
-      Byte[] order = joinDesc.getTagOrder();
-      int numAliases = order.length;
-
-      long aliasTotalKnownInputSize = 0;
-      HashMap<String, Long> aliasToSize = new HashMap<String, Long>();
-      try {
-        // go over all the input paths, and calculate a known total size, known
-        // size for each input alias.
-        Utilities.getInputSummary(context, currWork, null).getLength();
-
-        // set alias to size mapping, this can be used to determine if one table
-        // is choosen as big table, what's the total size of left tables, which
-        // are going to be small tables.
-        for (Map.Entry<String, ArrayList<String>> entry : pathToAliases.entrySet()) {
-          String path = entry.getKey();
-          List<String> aliasList = entry.getValue();
-          ContentSummary cs = context.getCS(path);
-          if (cs != null) {
-            long size = cs.getLength();
-            for (String alias : aliasList) {
-              aliasTotalKnownInputSize += size;
-              Long es = aliasToSize.get(alias);
-              if (es == null) {
-                es = new Long(0);
-              }
-              es += size;
-              aliasToSize.put(alias, es);
-            }
-          }
-        }
-
-        HashSet<Integer> bigTableCandidates = MapJoinProcessor.getBigTableCandidates(joinDesc.getConds());
-
-        // no table could be the big table; there is no need to convert
-        if (bigTableCandidates == null) {
-          return null;
-        }
-
-        Configuration conf = context.getConf();
-
-        // If sizes of atleast n-1 tables in a n-way join is known, and their sum is smaller than
-        // the threshold size, convert the join into map-join and don't create a conditional task
-        boolean convertJoinMapJoin = HiveConf.getBoolVar(conf,
-            HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASK);
-        int bigTablePosition = -1;
-        if (convertJoinMapJoin) {
-          // This is the threshold that the user has specified to fit in mapjoin
-          long mapJoinSize = HiveConf.getLongVar(conf,
-              HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
-
-          boolean bigTableFound = false;
-          long largestBigTableCandidateSize = 0;
-          long sumTableSizes = 0;
-          for (String alias : aliasToWork.keySet()) {
-            int tablePosition = getPosition(currWork, joinOp, alias);
-            boolean bigTableCandidate = bigTableCandidates.contains(tablePosition);
-            Long size = aliasToSize.get(alias);
-            // The size is not available at compile time if the input is a sub-query.
-            // If the size of atleast n-1 inputs for a n-way join are available at compile time,
-            // and the sum of them is less than the specified threshold, then convert the join
-            // into a map-join without the conditional task.
-            if ((size == null) || (size > mapJoinSize)) {
-              sumTableSizes += largestBigTableCandidateSize;
-              if (bigTableFound || (sumTableSizes > mapJoinSize) || !bigTableCandidate) {
-                convertJoinMapJoin = false;
-                break;
-              }
-              bigTableFound = true;
-              bigTablePosition = tablePosition;
-              largestBigTableCandidateSize = mapJoinSize + 1;
-            } else {
-              if (bigTableCandidate && size > largestBigTableCandidateSize) {
-                bigTablePosition = tablePosition;
-                sumTableSizes += largestBigTableCandidateSize;
-                largestBigTableCandidateSize = size;
-              }
-              else {
-                sumTableSizes += size;
-              }
-
-              if (sumTableSizes > mapJoinSize) {
-                convertJoinMapJoin = false;
-                break;
-              }
-            }
-          }
-        }
-
-        String bigTableAlias = null;
-        currWork.setOpParseCtxMap(parseCtx.getOpParseCtx());
-        currWork.setJoinTree(joinTree);
-
-        if (convertJoinMapJoin) {
-          // create map join task and set big table as bigTablePosition
-          MapRedTask newTask = convertTaskToMapJoinTask(currWork, bigTablePosition).getFirst();
-
-          newTask.setTaskTag(Task.MAPJOIN_ONLY_NOBACKUP);
-          replaceTask(currTask, newTask, physicalContext);
-
-          // Can this task be merged with the child task. This can happen if a big table is being
-          // joined with multiple small tables on different keys
-          // Further optimizations are possible here, a join which has been converted to a mapjoin
-          // followed by a mapjoin can be performed in a single MR job.
-          if ((newTask.getChildTasks() != null) && (newTask.getChildTasks().size() == 1)
-              && (newTask.getChildTasks().get(0).getTaskTag() == Task.MAPJOIN_ONLY_NOBACKUP)) {
-            mergeMapJoinTaskWithChildMapJoinTask(newTask);
-          }
-
-          return newTask;
-        }
-
-        long ThresholdOfSmallTblSizeSum = HiveConf.getLongVar(conf,
-            HiveConf.ConfVars.HIVESMALLTABLESFILESIZE);
-        String xml = currWork.toXML();
-        for (int i = 0; i < numAliases; i++) {
-          // this table cannot be big table
-          if (!bigTableCandidates.contains(i)) {
-            continue;
-          }
-
-          // deep copy a new mapred work from xml
-          InputStream in = new ByteArrayInputStream(xml.getBytes("UTF-8"));
-          MapredWork newWork = Utilities.deserializeMapRedWork(in, physicalContext.getConf());
-
-          // create map join task and set big table as i
-          ObjectPair<MapRedTask, String> newTaskAlias = convertTaskToMapJoinTask(newWork, i);          
-          MapRedTask newTask = newTaskAlias.getFirst();
-          bigTableAlias = newTaskAlias.getSecond();
-
-          Long aliasKnownSize = aliasToSize.get(bigTableAlias);
-          if (aliasKnownSize != null && aliasKnownSize.longValue() > 0) {
-            long smallTblTotalKnownSize = aliasTotalKnownInputSize
-                - aliasKnownSize.longValue();
-            if(smallTblTotalKnownSize > ThresholdOfSmallTblSizeSum) {
-              //this table is not good to be a big table.
-              continue;
-            }
-          }
-
-          // add into conditional task
-          listWorks.add(newTask.getWork());
-          listTasks.add(newTask);
-          newTask.setTaskTag(Task.CONVERTED_MAPJOIN);
-
-          //set up backup task
-          newTask.setBackupTask(currTask);
-          newTask.setBackupChildrenTasks(currTask.getChildTasks());
-
-          // put the mapping alias to task
-          aliasToTask.put(bigTableAlias, newTask);
-        }
-      } catch (Exception e) {
-        e.printStackTrace();
-        throw new SemanticException("Generate Map Join Task Error: " + e.getMessage());
-      }
-
-      // insert current common join task to conditional task
-      listWorks.add(currTask.getWork());
-      listTasks.add(currTask);
-      // clear JoinTree and OP Parse Context
-      currWork.setOpParseCtxMap(null);
-      currWork.setJoinTree(null);
-
-      // create conditional task and insert conditional task into task tree
-      ConditionalWork cndWork = new ConditionalWork(listWorks);
-      ConditionalTask cndTsk = (ConditionalTask) TaskFactory.get(cndWork, parseCtx.getConf());
-      cndTsk.setListTasks(listTasks);
-
-      // set resolver and resolver context
-      cndTsk.setResolver(new ConditionalResolverCommonJoin());
-      ConditionalResolverCommonJoinCtx resolverCtx = new ConditionalResolverCommonJoinCtx();
-      resolverCtx.setPathToAliases(pathToAliases);
-      resolverCtx.setAliasToKnownSize(aliasToSize);
-      resolverCtx.setAliasToTask(aliasToTask);
-      resolverCtx.setCommonJoinTask(currTask);
-      resolverCtx.setLocalTmpDir(context.getLocalScratchDir(false));
-      resolverCtx.setHdfsTmpDir(context.getMRScratchDir());
-      cndTsk.setResolverCtx(resolverCtx);
-
-      //replace the current task with the new generated conditional task
-      this.replaceTaskWithConditionalTask(currTask, cndTsk, physicalContext);
-      return cndTsk;
-    }
-
-    private void replaceTaskWithConditionalTask(
-        Task<? extends Serializable> currTask, ConditionalTask cndTsk,
-        PhysicalContext physicalContext) {
-      // add this task into task tree
-      // set all parent tasks
-      List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
-      currTask.setParentTasks(null);
-      if (parentTasks != null) {
-        for (Task<? extends Serializable> tsk : parentTasks) {
-          // make new generated task depends on all the parent tasks of current task.
-          tsk.addDependentTask(cndTsk);
-          // remove the current task from its original parent task's dependent task
-          tsk.removeDependentTask(currTask);
-        }
-      } else {
-        // remove from current root task and add conditional task to root tasks
-        physicalContext.removeFromRootTask(currTask);
-        physicalContext.addToRootTask(cndTsk);
-      }
-      // set all child tasks
-      List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
-      if (oldChildTasks != null) {
-        for (Task<? extends Serializable> tsk : cndTsk.getListTasks()) {
-          if (tsk.equals(currTask)) {
-            continue;
-          }
-          for (Task<? extends Serializable> oldChild : oldChildTasks) {
-            tsk.addDependentTask(oldChild);
-          }
-        }
-      }
-    }
-
-    // Replace the task with the new task. Copy the children and parents of the old
-    // task to the new task.
-    private void replaceTask(
-        Task<? extends Serializable> currTask, Task<? extends Serializable> newTask,
-        PhysicalContext physicalContext) {
-      // add this task into task tree
-      // set all parent tasks
-      List<Task<? extends Serializable>> parentTasks = currTask.getParentTasks();
-      currTask.setParentTasks(null);
-      if (parentTasks != null) {
-        for (Task<? extends Serializable> tsk : parentTasks) {
-          // remove the current task from its original parent task's dependent task
-          tsk.removeDependentTask(currTask);
-          // make new generated task depends on all the parent tasks of current task.
-          tsk.addDependentTask(newTask);
-        }
-      } else {
-        // remove from current root task and add conditional task to root tasks
-        physicalContext.removeFromRootTask(currTask);
-        physicalContext.addToRootTask(newTask);
-      }
-
-      // set all child tasks
-      List<Task<? extends Serializable>> oldChildTasks = currTask.getChildTasks();
-      currTask.setChildTasks(null);
-      if (oldChildTasks != null) {
-        for (Task<? extends Serializable> tsk : oldChildTasks) {
-          // remove the current task from its original parent task's dependent task
-          tsk.getParentTasks().remove(currTask);
-          // make new generated task depends on all the parent tasks of current task.
-          newTask.addDependentTask(tsk);
-        }
-      }
-    }
-
-    @Override
-    public Object dispatch(Node nd, Stack<Node> stack, Object... nodeOutputs)
-        throws SemanticException {
-      if (nodeOutputs == null || nodeOutputs.length == 0) {
-        throw new SemanticException("No Dispatch Context");
-      }
-
-      TaskGraphWalkerContext walkerCtx = (TaskGraphWalkerContext) nodeOutputs[0];
-
-      Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
-      // not map reduce task or not conditional task, just skip
-      if (currTask.isMapRedTask()) {
-        if (currTask instanceof ConditionalTask) {
-          // get the list of task
-          List<Task<? extends Serializable>> taskList = ((ConditionalTask) currTask).getListTasks();
-          for (Task<? extends Serializable> tsk : taskList) {
-            if (tsk.isMapRedTask()) {
-              Task<? extends Serializable> newTask = this.processCurrentTask((MapRedTask) tsk,
-                  ((ConditionalTask) currTask), physicalContext.getContext());
-              walkerCtx.addToDispatchList(newTask);
-            }
-          }
-        } else {
-          Task<? extends Serializable> newTask =
-              this.processCurrentTask((MapRedTask) currTask, null, physicalContext.getContext());
-          walkerCtx.addToDispatchList(newTask);
-        }
-      }
-      return null;
-    }
-
-    /*
-     * If any operator which does not allow map-side conversion is present in the mapper, dont
-     * convert it into a conditional task.
-     */
-    private boolean checkOperatorOKMapJoinConversion(Operator<? extends OperatorDesc> op) {
-      if (!op.opAllowedConvertMapJoin()) {
-        return false;
-      }
-
-      if (op.getChildOperators() == null) {
-        return true;
-      }
-
-      for (Operator<? extends OperatorDesc> childOp : op.getChildOperators()) {
-        if (!checkOperatorOKMapJoinConversion(childOp)) {
-          return false;
-        }
-      }
-
-      return true;
-    }
-
-    private JoinOperator getJoinOp(MapRedTask task) throws SemanticException {
-      MapredWork work = task.getWork();
-      if (work == null) {
-        return null;
-      }
-      Operator<? extends OperatorDesc> reducerOp = work.getReducer();
-      if (reducerOp instanceof JoinOperator) {
-        /* Is any operator present, which prevents the conversion */
-        Map<String, Operator<? extends OperatorDesc>> aliasToWork = work.getAliasToWork();
-        for (Operator<? extends OperatorDesc> op : aliasToWork.values()) {
-          if (!checkOperatorOKMapJoinConversion(op)) {
-            return null;
-          }
-        }
-        return (JoinOperator) reducerOp;
-      } else {
-        return null;
-      }
-    }
-  }
 }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Fri Apr 26 19:14:49 2013
@@ -51,7 +51,15 @@ public class PhysicalOptimizer {
     }
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       resolvers.add(new CommonJoinResolver());
+
+      // The joins have been automatically converted to map-joins.
+      // However, if the joins were converted to sort-merge joins automatically,
+      // they should also be tried as map-joins.
+      if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_TOMAPJOIN)) {
+        resolvers.add(new SortMergeJoinResolver());
+      }
     }
+
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER)) {
       resolvers.add(new IndexWhereResolver());
     }
@@ -61,7 +69,7 @@ public class PhysicalOptimizer {
     }
 
     // Physical optimizers which follow this need to be careful not to invalidate the inferences
-    // made by this optimizer.  Only optimizers which depend on the results of this one should
+    // made by this optimizer. Only optimizers which depend on the results of this one should
     // follow it.
     if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) {
       resolvers.add(new BucketingSortingInferenceOptimizer());

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcContext.java Fri Apr 26 19:14:49 2013
@@ -40,7 +40,7 @@ public class UnionProcContext implements
     private final transient boolean[] mapOnlySubqSet;
     private final transient boolean[] rootTask;
 
-    private transient int numInputs;
+    private final transient int numInputs;
 
     public UnionParseContext(int numInputs) {
       this.numInputs = numInputs;
@@ -70,27 +70,22 @@ public class UnionProcContext implements
       return numInputs;
     }
 
-    public void setNumInputs(int numInputs) {
-      this.numInputs = numInputs;
-    }
-
     public boolean allMapOnlySubQ() {
-      if (mapOnlySubq != null) {
-        for (boolean mapOnly : mapOnlySubq) {
-          if (!mapOnly) {
-            return false;
-          }
-        }
-      }
-      return true;
+      return isAllTrue(mapOnlySubq);
     }
 
     public boolean allMapOnlySubQSet() {
-      if (mapOnlySubqSet != null) {
-        for (boolean mapOnlySet : mapOnlySubqSet) {
-          if (!mapOnlySet) {
-            return false;
-          }
+      return isAllTrue(mapOnlySubqSet);
+    }
+
+    public boolean allRootTasks() {
+      return isAllTrue(rootTask);
+    }
+
+    public boolean isAllTrue(boolean[] array) {
+      for (boolean value : array) {
+        if (!value) {
+          return false;
         }
       }
       return true;

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/unionproc/UnionProcFactory.java Fri Apr 26 19:14:49 2013
@@ -138,20 +138,21 @@ public final class UnionProcFactory {
         }
         start--;
       }
+      assert parentUnionOperator != null;
 
       // default to false
       boolean mapOnly = false;
-      if (parentUnionOperator != null) {
-        UnionParseContext parentUCtx =
+      boolean rootTask = false;
+      UnionParseContext parentUCtx =
           ctx.getUnionParseContext(parentUnionOperator);
-        if (parentUCtx != null && parentUCtx.allMapOnlySubQSet()) {
-          mapOnly = parentUCtx.allMapOnlySubQ();
-        }
+      if (parentUCtx != null && parentUCtx.allMapOnlySubQSet()) {
+        mapOnly = parentUCtx.allMapOnlySubQ();
+        rootTask = parentUCtx.allRootTasks();
       }
 
       uCtx.setMapOnlySubq(pos, mapOnly);
 
-      uCtx.setRootTask(pos, false);
+      uCtx.setRootTask(pos, rootTask);
       ctx.setUnionParseContext(union, uCtx);
       return null;
     }

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java?rev=1476348&r1=1476347&r2=1476348&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java Fri Apr 26 19:14:49 2013
@@ -37,6 +37,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SkewedValueList;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryProperties;
@@ -965,7 +966,7 @@ public abstract class BaseSemanticAnalyz
    * @return
    */
   protected ListBucketingCtx constructListBucketingCtx(List<String> skewedColNames,
-      List<List<String>> skewedValues, Map<List<String>, String> skewedColValueLocationMaps,
+      List<List<String>> skewedValues, Map<SkewedValueList, String> skewedColValueLocationMaps,
       boolean isStoredAsSubDirectories, HiveConf conf) {
     ListBucketingCtx lbCtx = new ListBucketingCtx();
     lbCtx.setSkewedColNames(skewedColNames);