You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 00:26:58 UTC

svn commit: r1629544 [9/33] - in /hive/branches/spark-new: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ co...

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Sun Oct  5 22:26:43 2014
@@ -109,8 +109,8 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.thrift.TException;
 
 import com.google.common.collect.Sets;
@@ -378,6 +378,27 @@ public class Hive {
       List<String> partCols, Class<? extends InputFormat> fileInputFormat,
       Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols)
       throws HiveException {
+    createTable(tableName, columns, partCols, fileInputFormat, fileOutputFormat, bucketCount,
+        bucketCols, null);
+  }
+
+  /**
+   * Create a table metadata and the directory for the table data
+   * @param tableName table name
+   * @param columns list of fields of the table
+   * @param partCols partition keys of the table
+   * @param fileInputFormat Class of the input format of the table data file
+   * @param fileOutputFormat Class of the output format of the table data file
+   * @param bucketCount number of buckets that each partition (or the table itself) should be
+   *                    divided into
+   * @param bucketCols Bucket columns
+   * @param parameters Parameters for the table
+   * @throws HiveException
+   */
+  public void createTable(String tableName, List<String> columns, List<String> partCols,
+                          Class<? extends InputFormat> fileInputFormat,
+                          Class<?> fileOutputFormat, int bucketCount, List<String> bucketCols,
+                          Map<String, String> parameters) throws HiveException {
     if (columns == null) {
       throw new HiveException("columns not specified for table " + tableName);
     }
@@ -402,6 +423,9 @@ public class Hive {
     tbl.setSerializationLib(LazySimpleSerDe.class.getName());
     tbl.setNumBuckets(bucketCount);
     tbl.setBucketCols(bucketCols);
+    if (parameters != null) {
+      tbl.setParamters(parameters);
+    }
     createTable(tbl);
   }
 
@@ -427,9 +451,9 @@ public class Hive {
       newTbl.checkValidity();
       getMSC().alter_table(names[0], names[1], newTbl.getTTable());
     } catch (MetaException e) {
-      throw new HiveException("Unable to alter table.", e);
+      throw new HiveException("Unable to alter table. " + e.getMessage(), e);
     } catch (TException e) {
-      throw new HiveException("Unable to alter table.", e);
+      throw new HiveException("Unable to alter table. " + e.getMessage(), e);
     }
   }
 
@@ -455,9 +479,9 @@ public class Hive {
     try {
       getMSC().alter_index(dbName, baseTblName, idxName, newIdx);
     } catch (MetaException e) {
-      throw new HiveException("Unable to alter index.", e);
+      throw new HiveException("Unable to alter index. " + e.getMessage(), e);
     } catch (TException e) {
-      throw new HiveException("Unable to alter index.", e);
+      throw new HiveException("Unable to alter index. " + e.getMessage(), e);
     }
   }
 
@@ -502,9 +526,9 @@ public class Hive {
       getMSC().alter_partition(dbName, tblName, newPart.getTPartition());
 
     } catch (MetaException e) {
-      throw new HiveException("Unable to alter partition.", e);
+      throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
     } catch (TException e) {
-      throw new HiveException("Unable to alter partition.", e);
+      throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
     }
   }
 
@@ -534,9 +558,9 @@ public class Hive {
       }
       getMSC().alter_partitions(names[0], names[1], newTParts);
     } catch (MetaException e) {
-      throw new HiveException("Unable to alter partition.", e);
+      throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
     } catch (TException e) {
-      throw new HiveException("Unable to alter partition.", e);
+      throw new HiveException("Unable to alter partition. " + e.getMessage(), e);
     }
   }
   /**
@@ -578,11 +602,11 @@ public class Hive {
           newPart.getTPartition());
 
     } catch (InvalidOperationException e){
-      throw new HiveException("Unable to rename partition.", e);
+      throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
     } catch (MetaException e) {
-      throw new HiveException("Unable to rename partition.", e);
+      throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
     } catch (TException e) {
-      throw new HiveException("Unable to rename partition.", e);
+      throw new HiveException("Unable to rename partition. " + e.getMessage(), e);
     }
   }
 
@@ -591,11 +615,11 @@ public class Hive {
     try {
       getMSC().alterDatabase(dbName, db);
     } catch (MetaException e) {
-      throw new HiveException("Unable to alter database " + dbName, e);
+      throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e);
     } catch (NoSuchObjectException e) {
       throw new HiveException("Database " + dbName + " does not exists.", e);
     } catch (TException e) {
-      throw new HiveException("Unable to alter database " + dbName, e);
+      throw new HiveException("Unable to alter database " + dbName + ". " + e.getMessage(), e);
     }
   }
   /**
@@ -870,14 +894,31 @@ public class Hive {
     try {
       return getMSC().dropIndex(db_name, tbl_name, index_name, deleteData);
     } catch (NoSuchObjectException e) {
-      throw new HiveException("Partition or table doesn't exist.", e);
+      throw new HiveException("Partition or table doesn't exist. " + e.getMessage(), e);
     } catch (Exception e) {
-      throw new HiveException("Unknown error. Please check logs.", e);
+      throw new HiveException(e.getMessage(), e);
     }
   }
 
   /**
    * Drops table along with the data in it. If the table doesn't exist then it
+   * is a no-op. If ifPurge option is specified it is passed to the
+   * hdfs command that removes table data from warehouse to make it skip trash.
+   *
+   * @param tableName
+   *          table to drop
+   * @param ifPurge
+   *          completely purge the table (skipping trash) while removing data from warehouse
+   * @throws HiveException
+   *           thrown if the drop fails
+   */
+  public void dropTable(String tableName, boolean ifPurge) throws HiveException {
+    String[] names = Utilities.getDbTableName(tableName);
+    dropTable(names[0], names[1], true, true, ifPurge);
+  }
+
+  /**
+   * Drops table along with the data in it. If the table doesn't exist then it
    * is a no-op
    *
    * @param tableName
@@ -886,8 +927,7 @@ public class Hive {
    *           thrown if the drop fails
    */
   public void dropTable(String tableName) throws HiveException {
-    String[] names = Utilities.getDbTableName(tableName);
-    dropTable(names[0], names[1], true, true);
+    dropTable(tableName, false);
   }
 
   /**
@@ -902,7 +942,7 @@ public class Hive {
    *           thrown if the drop fails
    */
   public void dropTable(String dbName, String tableName) throws HiveException {
-    dropTable(dbName, tableName, true, true);
+    dropTable(dbName, tableName, true, true, false);
   }
 
   /**
@@ -913,14 +953,31 @@ public class Hive {
    * @param deleteData
    *          deletes the underlying data along with metadata
    * @param ignoreUnknownTab
-   *          an exception if thrown if this is falser and table doesn't exist
+   *          an exception is thrown if this is false and the table doesn't exist
    * @throws HiveException
    */
   public void dropTable(String dbName, String tableName, boolean deleteData,
       boolean ignoreUnknownTab) throws HiveException {
+    dropTable(dbName, tableName, deleteData, ignoreUnknownTab, false);
+  }
 
+  /**
+   * Drops the table.
+   *
+   * @param dbName
+   * @param tableName
+   * @param deleteData
+   *          deletes the underlying data along with metadata
+   * @param ignoreUnknownTab
+   *          an exception is thrown if this is false and the table doesn't exist
+   * @param ifPurge
+   *          completely purge the table skipping trash while removing data from warehouse
+   * @throws HiveException
+   */
+  public void dropTable(String dbName, String tableName, boolean deleteData,
+      boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
     try {
-      getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
+      getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
     } catch (NoSuchObjectException e) {
       if (!ignoreUnknownTab) {
         throw new HiveException(e);
@@ -1008,7 +1065,7 @@ public class Hive {
       }
       return null;
     } catch (Exception e) {
-      throw new HiveException("Unable to fetch table " + tableName, e);
+      throw new HiveException("Unable to fetch table " + tableName + ". " + e.getMessage(), e);
     }
 
     // For non-views, we need to do some extra fixes
@@ -1204,6 +1261,15 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
+  public void loadPartition(Path loadPath, String tableName,
+      Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
+      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
+      boolean isSrcLocal, boolean isAcid) throws HiveException {
+    Table tbl = getTable(tableName);
+    loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
+        isSkewedStoreAsSubdir, isSrcLocal, isAcid);
+  }
+
   /**
    * Load a directory into a Hive Table Partition - Alters existing content of
    * the partition with the contents of loadPath. - If the partition does not
@@ -1212,7 +1278,7 @@ public class Hive {
    *
    * @param loadPath
    *          Directory containing files to load into Table
-   * @param tableName
+   * @param  tbl
    *          name of table to be loaded.
    * @param partSpec
    *          defines which partition needs to be loaded
@@ -1225,12 +1291,12 @@ public class Hive {
    * @param isSrcLocal
    *          If the source directory is LOCAL
    */
-  public void loadPartition(Path loadPath, String tableName,
+  public Partition loadPartition(Path loadPath, Table tbl,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid) throws HiveException {
-    Table tbl = getTable(tableName);
     Path tblDataLocationPath =  tbl.getDataLocation();
+    Partition newTPart = null;
     try {
       /**
        * Move files before creating the partition since down stream processes
@@ -1279,10 +1345,10 @@ public class Hive {
         Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
       }
 
+      boolean forceCreate = (!holdDDLTime) ? true : false;
+      newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
       // recreate the partition if it existed before
       if (!holdDDLTime) {
-        Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
-            inheritTableSpecs);
         if (isSkewedStoreAsSubdir) {
           org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
           SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1292,9 +1358,9 @@ public class Hive {
           /* Add list bucketing location mappings. */
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
-          alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+          alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
           newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
-          newCreatedTpart = newTPart.getTPartition();
+          return new Partition(tbl, newCreatedTpart);
         }
       }
     } catch (IOException e) {
@@ -1307,7 +1373,7 @@ public class Hive {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
     }
-
+    return newTPart;
   }
 
   /**
@@ -1403,18 +1469,18 @@ private void constructOneLBLocationMap(F
    * @param replace
    * @param numDP number of dynamic partitions
    * @param holdDDLTime
-   * @return a list of strings with the dynamic partition paths
+   * @return partition map details (PartitionSpec and Partition)
    * @throws HiveException
    */
-  public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
+  public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
       int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
     try {
-      ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
-        new ArrayList<LinkedHashMap<String, String>>();
+      Map<Map<String, String>, Partition> partitionsMap = new
+          LinkedHashMap<Map<String, String>, Partition>();
 
       FileSystem fs = loadPath.getFileSystem(conf);
       FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1448,6 +1514,7 @@ private void constructOneLBLocationMap(F
             + " to at least " + validPartitions.size() + '.');
       }
 
+      Table tbl = getTable(tableName);
       // for each dynamically created DP directory, construct a full partition spec
       // and load the partition based on that
       Iterator<Path> iter = validPartitions.iterator();
@@ -1460,14 +1527,12 @@ private void constructOneLBLocationMap(F
         // generate a full partition specification
         LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
         Warehouse.makeSpecFromName(fullPartSpec, partPath);
-        fullPartSpecs.add(fullPartSpec);
-
-        // finally load the partition -- move the file to the final table address
-        loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
-            listBucketingEnabled, false, isAcid);
+        Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
+            holdDDLTime, true, listBucketingEnabled, false, isAcid);
+        partitionsMap.put(fullPartSpec, newPartition);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
-      return fullPartSpecs;
+      return partitionsMap;
     } catch (IOException e) {
       throw new HiveException(e);
     }
@@ -1500,6 +1565,7 @@ private void constructOneLBLocationMap(F
       tbl.replaceFiles(loadPath, isSrcLocal);
     } else {
       tbl.copyFiles(loadPath, isSrcLocal, isAcid);
+      tbl.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK, "true");
     }
 
     try {
@@ -1613,17 +1679,6 @@ private void constructOneLBLocationMap(F
     return getPartition(tbl, partSpec, forceCreate, null, true);
   }
 
-  private static void clearPartitionStats(org.apache.hadoop.hive.metastore.api.Partition tpart) {
-    Map<String,String> tpartParams = tpart.getParameters();
-    if (tpartParams == null) {
-      return;
-    }
-
-    for (String statType : StatsSetupConst.supportedStats) {
-      tpartParams.remove(statType);
-    }
-  }
-
   /**
    * Returns partition metadata
    *
@@ -1691,7 +1746,7 @@ private void constructOneLBLocationMap(F
             throw new HiveException("new partition path should not be null or empty.");
           }
           tpart.getSd().setLocation(partPath);
-          clearPartitionStats(tpart);
+          tpart.getParameters().put(StatsSetupConst.STATS_GENERATED_VIA_STATS_TASK,"true");
           String fullName = tbl.getTableName();
           if (!org.apache.commons.lang.StringUtils.isEmpty(tbl.getDbName())) {
             fullName = tbl.getDbName() + "." + tbl.getTableName();
@@ -1722,7 +1777,7 @@ private void constructOneLBLocationMap(F
     } catch (NoSuchObjectException e) {
       throw new HiveException("Partition or table doesn't exist.", e);
     } catch (Exception e) {
-      throw new HiveException("Unknown error. Please check logs.", e);
+      throw new HiveException(e.getMessage(), e);
     }
   }
 
@@ -1736,6 +1791,7 @@ private void constructOneLBLocationMap(F
   public List<Partition> dropPartitions(String dbName, String tblName,
       List<DropTableDesc.PartSpec> partSpecs,  boolean deleteData, boolean ignoreProtection,
       boolean ifExists) throws HiveException {
+    //TODO: add support for ifPurge
     try {
       Table tbl = getTable(dbName, tblName);
       List<ObjectPair<Integer, byte[]>> partExprs =
@@ -1750,7 +1806,7 @@ private void constructOneLBLocationMap(F
     } catch (NoSuchObjectException e) {
       throw new HiveException("Partition or table doesn't exist.", e);
     } catch (Exception e) {
-      throw new HiveException("Unknown error. Please check logs.", e);
+      throw new HiveException(e.getMessage(), e);
     }
   }
 
@@ -2243,7 +2299,7 @@ private void constructOneLBLocationMap(F
         result.add(srcToDest);
       }
     } catch (IOException e) {
-      throw new HiveException("checkPaths: filesystem error in check phase", e);
+      throw new HiveException("checkPaths: filesystem error in check phase. " + e.getMessage(), e);
     }
     return result;
   }
@@ -2310,7 +2366,7 @@ private void constructOneLBLocationMap(F
       try {
         ShimLoader.getHadoopShims().setFullFileStatus(conf, destStatus, fs, destf);
       } catch (IOException e) {
-        LOG.warn("Error setting permission of file " + destf + ": "+ StringUtils.stringifyException(e));
+        LOG.warn("Error setting permission of file " + destf + ": "+ e.getMessage(), e);
       }
     }
     return success;
@@ -2349,7 +2405,7 @@ private void constructOneLBLocationMap(F
       srcs = srcFs.globStatus(srcf);
     } catch (IOException e) {
       LOG.error(StringUtils.stringifyException(e));
-      throw new HiveException("addFiles: filesystem error in check phase", e);
+      throw new HiveException("addFiles: filesystem error in check phase. " + e.getMessage(), e);
     }
     if (srcs == null) {
       LOG.info("No sources specified to move: " + srcf);
@@ -2375,7 +2431,7 @@ private void constructOneLBLocationMap(F
           }
         }
       } catch (IOException e) {
-        throw new HiveException("copyFiles: error while moving files!!!", e);
+        throw new HiveException("copyFiles: error while moving files!!! " + e.getMessage(), e);
       }
     }
   }
@@ -2447,7 +2503,7 @@ private void constructOneLBLocationMap(F
               fs.rename(bucketSrc, bucketDest);
             }
           } catch (IOException e) {
-            throw new HiveException("Error moving acid files", e);
+            throw new HiveException("Error moving acid files " + e.getMessage(), e);
           }
         }
       }
@@ -2679,7 +2735,7 @@ private void constructOneLBLocationMap(F
       throw new HiveException(e);
     }
   }
-  
+
   public boolean setPartitionColumnStatistics(SetPartitionsStatsRequest request) throws HiveException {
     try {
       return getMSC().setPartitionColumnStatistics(request);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Sun Oct  5 22:26:43 2014
@@ -451,7 +451,11 @@ public class SessionHiveMetaStoreClient 
     // Delete table data
     if (deleteData && !MetaStoreUtils.isExternalTable(table)) {
       try {
-        getWh().deleteDir(tablePath, true);
+        boolean ifPurge = false;
+        if (envContext != null){
+          ifPurge = Boolean.parseBoolean(envContext.getProperties().get("ifPurge"));
+        }
+        getWh().deleteDir(tablePath, true, ifPurge);
       } catch (Exception err) {
         LOG.error("Failed to delete temp table directory: " + tablePath, err);
         // Forgive error

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java Sun Oct  5 22:26:43 2014
@@ -385,6 +385,10 @@ public class Table implements Serializab
     tTable.getParameters().put(name, value);
   }
 
+  public void setParamters(Map<String, String> params) {
+    tTable.setParameters(params);
+  }
+
   public String getProperty(String name) {
     return tTable.getParameters().get(name);
   }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Sun Oct  5 22:26:43 2014
@@ -670,10 +670,15 @@ public final class ConstantPropagateProc
       cppCtx.getOpToConstantExprs().put(op, constants);
       foldOperator(op, cppCtx);
       List<ExprNodeDesc> colList = op.getConf().getColList();
+      List<String> columnNames = op.getConf().getOutputColumnNames();
+      Map<String, ExprNodeDesc> columnExprMap = op.getColumnExprMap();
       if (colList != null) {
         for (int i = 0; i < colList.size(); i++) {
           ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false);
           colList.set(i, newCol);
+          if (columnExprMap != null) {
+            columnExprMap.put(columnNames.get(i), newCol);
+          }
         }
         LOG.debug("New column list:(" + StringUtils.join(colList, " ") + ")");
       }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Sun Oct  5 22:26:43 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -29,12 +30,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
+import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -42,12 +48,16 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
+import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
+import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
+import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
@@ -60,39 +70,46 @@ public class ConvertJoinMapJoin implemen
 
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
+  @SuppressWarnings("unchecked")
   @Override
-    /*
-     * (non-Javadoc)
-     * we should ideally not modify the tree we traverse.
-     * However, since we need to walk the tree at any time when we modify the
-     * operator, we might as well do it here.
-     */
-    public Object process(Node nd, Stack<Node> stack,
-        NodeProcessorCtx procCtx, Object... nodeOutputs)
-    throws SemanticException {
+  /*
+   * (non-Javadoc) we should ideally not modify the tree we traverse. However,
+   * since we need to walk the tree at any time when we modify the operator, we
+   * might as well do it here.
+   */
+  public Object
+      process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
+          throws SemanticException {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
-    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
+    JoinOperator joinOp = (JoinOperator) nd;
+
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
+        && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
-    JoinOperator joinOp = (JoinOperator) nd;
-    // if we have traits, and table info is present in the traits, we know the 
+    // if we have traits, and table info is present in the traits, we know the
     // exact number of buckets. Else choose the largest number of estimated
     // reducers from the parent operators.
     int numBuckets = -1;
     int estimatedBuckets = -1;
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
       for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
         if (parentOp.getOpTraits().getNumBuckets() > 0) {
-          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? 
-              parentOp.getOpTraits().getNumBuckets() : numBuckets; 
+          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
+              parentOp.getOpTraits().getNumBuckets() : numBuckets;
         }
 
         if (parentOp instanceof ReduceSinkOperator) {
           ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
-          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? 
+          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
               rs.getConf().getNumReducers() : estimatedBuckets;
         }
       }
@@ -107,29 +124,80 @@ public class ConvertJoinMapJoin implemen
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
+    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
     if (mapJoinConversionPos < 0) {
-      // we cannot convert to bucket map join, we cannot convert to 
-      // map join either based on the size
+      // we cannot convert to bucket map join, we cannot convert to
+      // map join either based on the size. Check if we can convert to SMB join.
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
+        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
+        return null;
+      }
+      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
+      try {
+        bigTableMatcherClass =
+            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
+                context.parseContext.getConf(),
+                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
+      } catch (ClassNotFoundException e) {
+        throw new SemanticException(e.getMessage());
+      }
+
+      BigTableSelectorForAutoSMJ bigTableMatcher =
+          ReflectionUtils.newInstance(bigTableMatcherClass, null);
+      JoinDesc joinDesc = joinOp.getConf();
+      JoinCondDesc[] joinCondns = joinDesc.getConds();
+      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
+      if (joinCandidates.isEmpty()) {
+        // This is a full outer join. This can never be a map-join
+        // of any type. So return false.
+        return false;
+      }
+      mapJoinConversionPos =
+          bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
+      if (mapJoinConversionPos < 0) {
+        // contains aliases from sub-query
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+        return null;
+      }
+
+      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
+            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
+      } else {
+        // we are just converting to a common merge join operator. The shuffle
+        // join in map-reduce case.
+        int pos = 0; // it doesn't matter which position we use in this case.
+        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+      }
       return null;
     }
 
-    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
-      if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
-        return null;
+    if (numBuckets > 1) {
+      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+        if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
+          return null;
+        }
       }
     }
 
     LOG.info("Convert to non-bucketed map join");
     // check if we can convert to map join no bucket scaling.
-    mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
+    mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
     if (mapJoinConversionPos < 0) {
+      // we are just converting to a common merge join operator. The shuffle
+      // join in map-reduce case.
+      int pos = 0; // it doesn't matter which position we use in this case.
+      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
     // map join operator by default has no bucket cols
-    mapJoinOp.setOpTraits(new OpTraits(null, -1));
+    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
       setAllChildrenTraitsToNull(childOp);
@@ -138,11 +206,107 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
+  // replaces the join operator with a new CommonJoinOperator, removes the
+  // parent reduce sinks
+  private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
+      throws SemanticException {
+    ParseContext parseContext = context.parseContext;
+    MapJoinDesc mapJoinDesc = null;
+    if (adjustParentsChildren) {
+        mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
+            joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
+    } else {
+      JoinDesc joinDesc = joinOp.getConf();
+      // retain the original join desc in the map join.
+      mapJoinDesc =
+          new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
+              joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
+              joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
+    }
+
+    @SuppressWarnings("unchecked")
+    CommonMergeJoinOperator mergeJoinOp =
+        (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
+            isSubQuery, mapJoinConversionPos, mapJoinDesc));
+    OpTraits opTraits =
+        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
+            .getSortCols());
+    mergeJoinOp.setOpTraits(opTraits);
+    mergeJoinOp.setStatistics(joinOp.getStatistics());
+
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      int pos = parentOp.getChildOperators().indexOf(joinOp);
+      parentOp.getChildOperators().remove(pos);
+      parentOp.getChildOperators().add(pos, mergeJoinOp);
+    }
+
+    for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
+      int pos = childOp.getParentOperators().indexOf(joinOp);
+      childOp.getParentOperators().remove(pos);
+      childOp.getParentOperators().add(pos, mergeJoinOp);
+    }
+
+    List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
+    if (childOperators == null) {
+      childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setChildOperators(childOperators);
+    }
+
+    List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
+    if (parentOperators == null) {
+      parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
+      mergeJoinOp.setParentOperators(parentOperators);
+    }
+
+    childOperators.clear();
+    parentOperators.clear();
+    childOperators.addAll(joinOp.getChildOperators());
+    parentOperators.addAll(joinOp.getParentOperators());
+    mergeJoinOp.getConf().setGenJoinKeys(false);
+
+    if (adjustParentsChildren) {
+      mergeJoinOp.getConf().setGenJoinKeys(true);
+      List<Operator<? extends OperatorDesc>> newParentOpList =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
+        for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
+          grandParentOp.getChildOperators().remove(parentOp);
+          grandParentOp.getChildOperators().add(mergeJoinOp);
+          newParentOpList.add(grandParentOp);
+        }
+      }
+      mergeJoinOp.getParentOperators().clear();
+      mergeJoinOp.getParentOperators().addAll(newParentOpList);
+      List<Operator<? extends OperatorDesc>> parentOps =
+          new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
+      for (Operator<? extends OperatorDesc> parentOp : parentOps) {
+        int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
+        if (parentIndex == mapJoinConversionPos) {
+          continue;
+        }
+
+        // insert the dummy store operator here
+        DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
+        dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
+        dummyStoreOp.getChildOperators().add(mergeJoinOp);
+        int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
+        parentOp.getChildOperators().remove(index);
+        parentOp.getChildOperators().add(index, dummyStoreOp);
+        dummyStoreOp.getParentOperators().add(parentOp);
+        mergeJoinOp.getParentOperators().remove(parentIndex);
+        mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
+      }
+    }
+    mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
+  }
+
   private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(null, -1));
+    currentOp.setOpTraits(new OpTraits(null, -1, null));
     for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
         break;
@@ -151,28 +315,26 @@ public class ConvertJoinMapJoin implemen
     }
   }
 
-  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
-      int bigTablePosition) throws SemanticException {
-
-    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
+  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
 
     if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
       LOG.info("Check conversion to bucket map join failed.");
       return false;
     }
 
-    MapJoinOperator mapJoinOp = 
-      convertJoinMapJoin(joinOp, context, bigTablePosition);
+    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
     joinDesc.setBucketMapJoin(true);
 
     // we can set the traits for this join operator
     OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
-        tezBucketJoinProcCtx.getNumBuckets());
+        tezBucketJoinProcCtx.getNumBuckets(), null);
     mapJoinOp.setOpTraits(opTraits);
+    mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);
 
-    // Once the conversion is done, we can set the partitioner to bucket cols on the small table    
+    // Once the conversion is done, we can set the partitioner to bucket cols on the small table
     Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
     bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
     joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -182,6 +344,54 @@ public class ConvertJoinMapJoin implemen
     return true;
   }
 
+  /*
+   * This method tries to convert a join to an SMB. This is done based on
+   * traits. If the sorted by columns are the same as the join columns then, we
+   * can convert the join to an SMB. Otherwise retain the bucket map join as it
+   * is still more efficient than a regular join.
+   */
+  private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+
+    ReduceSinkOperator bigTableRS =
+        (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
+            .getNumBuckets();
+
+    // the sort and bucket cols have to match on both sides for this
+    // transformation of the join operation
+    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
+      if (!(parentOp instanceof ReduceSinkOperator)) {
+        // could be mux/demux operators. Currently not supported
+        LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
+        return false;
+      }
+      ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
+          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
+        LOG.info("We cannot convert to SMB because the sort column names do not match.");
+        return false;
+      }
+
+      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
+          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
+          == false) {
+        LOG.info("We cannot convert to SMB because bucket column names do not match.");
+        return false;
+      }
+    }
+
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = bigTableRS.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    LOG.info("We can convert the join to an SMB join.");
+    return true;
+  }
+
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
     for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -193,15 +403,13 @@ public class ConvertJoinMapJoin implemen
   }
 
   /*
-   *  We perform the following checks to see if we can convert to a bucket map join
-   *  1. If the parent reduce sink of the big table side has the same emit key cols as 
-   *  its parent, we can create a bucket map join eliminating the reduce sink.
-   *  2. If we have the table information, we can check the same way as in Mapreduce to 
-   *  determine if we can perform a Bucket Map Join.
+   * If the parent reduce sink of the big table side has the same emit key cols
+   * as its parent, we can create a bucket map join eliminating the reduce sink.
    */
-  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, 
-      OptimizeTezProcContext context, int bigTablePosition, 
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
+      OptimizeTezProcContext context, int bigTablePosition,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx)
+  throws SemanticException {
     // bail on mux-operator because mux operator masks the emit keys of the
     // constituent reduce sinks
     if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -211,14 +419,41 @@ public class ConvertJoinMapJoin implemen
     }
 
     ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
+    List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
+    Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
+    List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
+    int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+    // all keys matched.
+    if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
+        tezBucketJoinProcCtx) == false) {
+      LOG.info("No info available to check for bucket map join. Cannot convert");
+      return false;
+    }
+
     /*
      * this is the case when the big table is a sub-query and is probably
-     * already bucketed by the join column in say a group by operation 
+     * already bucketed by the join column in say a group by operation
      */
-    List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
-    if ((colNames != null) && (colNames.isEmpty() == false)) {
-      Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
-      for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
+    boolean isSubQuery = false;
+    if (numBuckets < 0) {
+      isSubQuery = true;
+      numBuckets = rs.getConf().getNumReducers();
+    }
+    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+    return true;
+  }
+
+  private boolean checkColEquality(List<List<String>> grandParentColNames,
+      List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) {
+
+    if ((grandParentColNames == null) || (parentColNames == null)) {
+      return false;
+    }
+
+    if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
+      for (List<String> listBucketCols : grandParentColNames) {
         // can happen if this operator does not carry forward the previous bucketing columns
         // for e.g. another join operator which does not carry one of the sides' key columns
         if (listBucketCols.isEmpty()) {
@@ -226,9 +461,9 @@ public class ConvertJoinMapJoin implemen
         }
         int colCount = 0;
         // parent op is guaranteed to have a single list because it is a reduce sink
-        for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
+        for (String colName : parentColNames.get(0)) {
           // all columns need to be at least a subset of the parentOfParent's bucket cols
-          ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
+          ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
           if (exprNodeDesc instanceof ExprNodeColumnDesc) {
             if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
               colCount++;
@@ -236,32 +471,21 @@ public class ConvertJoinMapJoin implemen
               break;
             }
           }
-          
-          if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
-            // all keys matched.
-            int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
-            boolean isSubQuery = false;
-            if (numBuckets < 0) {
-              isSubQuery = true;
-              numBuckets = rs.getConf().getNumReducers();
-            }
-            tezBucketJoinProcCtx.setNumBuckets(numBuckets);
-            tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
+
+          if (colCount == parentColNames.get(0).size()) {
             return true;
           }
         }
       }
       return false;
     }
-
-    LOG.info("No info available to check for bucket map join. Cannot convert");
     return false;
   }
 
-  public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
       int buckets) {
-    Set<Integer> bigTableCandidateSet = MapJoinProcessor.
-      getBigTableCandidates(joinOp.getConf().getConds());
+    Set<Integer> bigTableCandidateSet =
+        MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
 
     long maxSize = context.conf.getLongVar(
         HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -287,7 +511,7 @@ public class ConvertJoinMapJoin implemen
       long inputSize = currInputStat.getDataSize();
       if ((bigInputStat == null) ||
           ((bigInputStat != null) &&
-           (inputSize > bigInputStat.getDataSize()))) {
+          (inputSize > bigInputStat.getDataSize()))) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -347,9 +571,9 @@ public class ConvertJoinMapJoin implemen
    * for tez.
    */
 
-  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
       int bigTablePosition) throws SemanticException {
-    // bail on mux operator because currently the mux operator masks the emit keys 
+    // bail on mux operator because currently the mux operator masks the emit keys
     // of the constituent reduce sinks.
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
       if (parentOp instanceof MuxOperator) {
@@ -359,12 +583,12 @@ public class ConvertJoinMapJoin implemen
 
     //can safely convert the join to a map join.
     ParseContext parseContext = context.parseContext;
-    MapJoinOperator mapJoinOp = MapJoinProcessor.
-      convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
-          joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+    MapJoinOperator mapJoinOp =
+        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
+            parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
 
-    Operator<? extends OperatorDesc> parentBigTableOp
-      = mapJoinOp.getParentOperators().get(bigTablePosition);
+    Operator<? extends OperatorDesc> parentBigTableOp =
+        mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
       for (Operator<?> p : parentBigTableOp.getParentOperators()) {
         // we might have generated a dynamic partition operator chain. Since
@@ -380,11 +604,10 @@ public class ConvertJoinMapJoin implemen
         }
       }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
-      if (!(mapJoinOp.getParentOperators().contains(
-              parentBigTableOp.getParentOperators().get(0)))) {
+      if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
             parentBigTableOp.getParentOperators().get(0));
-              }
+      }
       parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -397,15 +620,31 @@ public class ConvertJoinMapJoin implemen
     return mapJoinOp;
   }
 
-  private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
-    if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
-      return true;
-    }
-    for (Operator<?> c : op.getChildOperators()) {
-      if (hasDynamicPartitionBroadcast(c)) {
-        return true;
+  private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
+    boolean hasDynamicPartitionPruning = false;
+
+    for (Operator<?> op: parent.getChildOperators()) {
+      while (op != null) {
+        if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+          // found dynamic partition pruning operator
+          hasDynamicPartitionPruning = true;
+          break;
+        }
+      
+        if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
+          // crossing reduce sink or file sink means the pruning isn't for this parent.
+          break;
+        }
+
+        if (op.getChildOperators().size() != 1) {
+          // dynamic partition pruning pipeline doesn't have multiple children
+          break;
+        }
+
+        op = op.getChildOperators().get(0);
       }
     }
-    return false;
+
+    return hasDynamicPartitionPruning;
   }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Sun Oct  5 22:26:43 2014
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
+import com.google.common.collect.Interner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -39,8 +40,6 @@ import org.apache.hadoop.hive.ql.exec.No
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
-import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -101,7 +100,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 
 import java.io.Serializable;
@@ -580,8 +578,6 @@ public final class GenMapRedUtils {
     //This read entity is a direct read entity and not an indirect read (that is when
     // this is being read because it is a dependency of a view).
     boolean isDirectRead = (parentViewInfo == null);
-    PlanUtils.addInput(inputs,
-        new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
 
     for (Partition part : parts) {
       if (part.getTable().isPartitioned()) {
@@ -882,6 +878,30 @@ public final class GenMapRedUtils {
     }
   }
 
+  public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
+
+    if (task instanceof ConditionalTask) {
+      for (Task tsk : ((ConditionalTask) task).getListTasks()) {
+        internTableDesc(tsk, interner);
+      }
+    } else if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      work.getMapWork().internTable(interner);
+    } else if (task != null && (task.getWork() instanceof TezWork)) {
+      TezWork work = (TezWork)task.getWork();
+      for (BaseWork w : work.getAllWorkUnsorted()) {
+        if (w instanceof MapWork) {
+          ((MapWork)w).internTable(interner);
+        }
+      }
+    }
+    if (task.getNumChild() > 0) {
+      for (Task childTask : task.getChildTasks()) {
+        internTableDesc(childTask, interner);
+      }
+    }
+  }
+
   /**
    * create a new plan and return.
    *
@@ -1507,7 +1527,7 @@ public final class GenMapRedUtils {
    *
    * @param fsInputDesc
    * @param finalName
-   * @param inputFormatClass 
+   * @param inputFormatClass
    * @return MergeWork if table is stored as RCFile or ORCFile,
    *         null otherwise
    */
@@ -1714,7 +1734,7 @@ public final class GenMapRedUtils {
           // There are separate configuration parameters to control whether to
           // merge for a map-only job
           // or for a map-reduce job
-          if (currTask.getWork() instanceof MapredWork) {  
+          if (currTask.getWork() instanceof MapredWork) {
             ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
             boolean mergeMapOnly =
               hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1813,7 +1833,7 @@ public final class GenMapRedUtils {
     return Collections.emptyList();
   }
 
-  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) 
+  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
     throws SemanticException {
     List<Path> inputPaths = new ArrayList<Path>();
     switch (parseInfo.getTableSpec().specType) {
@@ -1850,6 +1870,7 @@ public final class GenMapRedUtils {
   public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
     final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
     OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
+      @Override
       public void apply(Operator<?> argument) {
         if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
           operators.add(argument);

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Sun Oct  5 22:26:43 2014
@@ -389,157 +389,8 @@ public class MapJoinProcessor implements
       JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
       throws SemanticException {
 
-    JoinDesc desc = op.getConf();
-    JoinCondDesc[] condns = desc.getConds();
-    Byte[] tagOrder = desc.getTagOrder();
-
-    // outer join cannot be performed on a table which is being cached
-    if (!noCheckOuterJoin) {
-      if (checkMapJoin(mapJoinPos, condns) < 0) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-    }
-
-    // Walk over all the sources (which are guaranteed to be reduce sink
-    // operators).
-    // The join outputs a concatenation of all the inputs.
-    QBJoinTree leftSrc = joinTree.getJoinSrc();
-    List<ReduceSinkOperator> oldReduceSinkParentOps =
-        new ArrayList<ReduceSinkOperator>(op.getNumParent());
-    if (leftSrc != null) {
-      // assert mapJoinPos == 0;
-      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
-      assert parentOp.getParentOperators().size() == 1;
-      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-    }
-
-
-    byte pos = 0;
-    for (String src : joinTree.getBaseSrc()) {
-      if (src != null) {
-        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
-        assert parentOp.getParentOperators().size() == 1;
-        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-      }
-      pos++;
-    }
-
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
-    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
-      byte tag = entry.getKey();
-      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
-      List<ExprNodeDesc> values = entry.getValue();
-      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
-      newValueExprs.put(tag, newValues);
-      for (int i = 0; i < schema.size(); i++) {
-        ColumnInfo column = schema.get(i);
-        if (column == null) {
-          continue;
-        }
-        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
-        int index = ExprNodeDescUtils.indexOf(expr, values);
-        if (index >= 0) {
-          colExprMap.put(column.getInternalName(), newValues.get(index));
-          schema.set(i, null);
-        }
-      }
-    }
-
-    // rewrite value index for mapjoin
-    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
-    // get the join keys from old parent ReduceSink operators
-    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
-    // construct valueTableDescs and valueFilteredTableDescs
-    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
-    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
-    int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
-      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
-      if (pos != mapJoinPos) {
-        // remove values in key exprs for value table schema
-        // value expression for hashsink will be modified in LocalMapJoinProcessor
-        int[] valueIndex = new int[valueCols.size()];
-        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
-        for (int i = 0; i < valueIndex.length; i++) {
-          ExprNodeDesc expr = valueCols.get(i);
-          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
-          if (kindex >= 0) {
-            valueIndex[i] = kindex;
-          } else {
-            valueIndex[i] = -valueColsInValueExpr.size() - 1;
-            valueColsInValueExpr.add(expr);
-          }
-        }
-        if (needValueIndex(valueIndex)) {
-          valueIndices.put(pos, valueIndex);
-        }
-        valueCols = valueColsInValueExpr;
-      }
-      // deep copy expr node desc
-      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
-      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
-        ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
-            .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
-        valueFilteredCols.add(isFilterDesc);
-      }
-
-      TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
-      TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
-          .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
-
-      valueTableDescs.add(valueTableDesc);
-      valueFilteredTableDescs.add(valueFilteredTableDesc);
-
-      keyExprMap.put(pos, keyCols);
-    }
-
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // create dumpfile prefix needed to create descriptor
-    String dumpFilePrefix = "";
-    if( joinTree.getMapAliases() != null ) {
-      for(String mapAlias : joinTree.getMapAliases()) {
-        dumpFilePrefix = dumpFilePrefix + mapAlias;
-      }
-      dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    } else {
-      dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
-
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
-    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
-        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-    MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
-        valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
-        filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
-    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
-    mapJoinDescriptor.setTagOrder(tagOrder);
-    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
-    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
-    if (!valueIndices.isEmpty()) {
-      mapJoinDescriptor.setValueIndices(valueIndices);
-    }
+    MapJoinDesc mapJoinDescriptor =
+        getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
 
     // reduce sink row resolver used to generate map join op
     RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -551,6 +402,7 @@ public class MapJoinProcessor implements
     opParseCtxMap.put(mapJoinOp, ctx);
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     mapJoinOp.setColumnExprMap(colExprMap);
 
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1176,4 +1028,168 @@ public class MapJoinProcessor implements
     }
 
   }
+
+  public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
+      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
+      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<ReduceSinkOperator> oldReduceSinkParentOps =
+        new ArrayList<ReduceSinkOperator>(op.getNumParent());
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+    }
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+      }
+      pos++;
+    }
+
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+      byte tag = entry.getKey();
+      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+      List<ExprNodeDesc> values = entry.getValue();
+      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+      newValueExprs.put(tag, newValues);
+      for (int i = 0; i < schema.size(); i++) {
+        ColumnInfo column = schema.get(i);
+        if (column == null) {
+          continue;
+        }
+        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+        int index = ExprNodeDescUtils.indexOf(expr, values);
+        if (index >= 0) {
+          colExprMap.put(column.getInternalName(), newValues.get(index));
+          schema.set(i, null);
+        }
+      }
+    }
+
+    // rewrite value index for mapjoin
+    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+    // get the join keys from old parent ReduceSink operators
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // construct valueTableDescs and valueFilteredTableDescs
+    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+    int[][] filterMap = desc.getFilterMap();
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+      if (pos != mapJoinPos) {
+        // remove values in key exprs for value table schema
+        // value expression for hashsink will be modified in
+        // LocalMapJoinProcessor
+        int[] valueIndex = new int[valueCols.size()];
+        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+        for (int i = 0; i < valueIndex.length; i++) {
+          ExprNodeDesc expr = valueCols.get(i);
+          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+          if (kindex >= 0) {
+            valueIndex[i] = kindex;
+          } else {
+            valueIndex[i] = -valueColsInValueExpr.size() - 1;
+            valueColsInValueExpr.add(expr);
+          }
+        }
+        if (needValueIndex(valueIndex)) {
+          valueIndices.put(pos, valueIndex);
+        }
+        valueCols = valueColsInValueExpr;
+      }
+      // deep copy expr node desc
+      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+        ExprNodeColumnDesc isFilterDesc =
+            new ExprNodeColumnDesc(
+                TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
+                "filter", false);
+        valueFilteredCols.add(isFilterDesc);
+      }
+
+      TableDesc valueTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
+              "mapjoinvalue"));
+      TableDesc valueFilteredTableDesc =
+          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
+              valueFilteredCols, "mapjoinvalue"));
+
+      valueTableDescs.add(valueTableDesc);
+      valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+      keyExprMap.put(pos, keyCols);
+    }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
+    String dumpFilePrefix = "";
+    if (joinTree.getMapAliases() != null) {
+      for (String mapAlias : joinTree.getMapAliases()) {
+        dumpFilePrefix = dumpFilePrefix + mapAlias;
+      }
+      dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    } else {
+      dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
+    }
+
+    List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc =
+        PlanUtils.getMapJoinKeyTableDesc(hconf,
+            PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
+    MapJoinDesc mapJoinDescriptor =
+        new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
+            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
+                .getConf().getNoOuterJoin(), dumpFilePrefix);
+    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+    mapJoinDescriptor.setTagOrder(tagOrder);
+    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+    if (!valueIndices.isEmpty()) {
+      mapJoinDescriptor.setValueIndices(valueIndices);
+    }
+
+    return mapJoinDescriptor;
+  }
 }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Sun Oct  5 22:26:43 2014
@@ -51,7 +51,12 @@ public class Optimizer {
    * @param hiveConf
    */
   public void initialize(HiveConf hiveConf) {
+
+    boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
+    boolean bucketMapJoinOptimizer = false;
+
     transformations = new ArrayList<Transform>();
+
     // Add the transformation that computes the lineage information.
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -81,15 +86,16 @@ public class Optimizer {
     }
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
-    boolean bucketMapJoinOptimizer = false;
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
+
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
       transformations.add(new BucketMapJoinOptimizer());
       bucketMapJoinOptimizer = true;
     }
 
     // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
     // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
-    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
+    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
+        && !isTezExecEngine) {
       if (!bucketMapJoinOptimizer) {
         // No need to add BucketMapJoinOptimizer twice
         transformations.add(new BucketMapJoinOptimizer());
@@ -119,7 +125,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
-        !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
+        !isTezExecEngine) {
       transformations.add(new CorrelationOptimizer());
     }
     if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -128,8 +134,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTIMIZEMETADATAQUERIES)) {
       transformations.add(new StatsOptimizer());
     }
-    String execEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
-    if ((pctx.getContext().getExplain() || "spark".equals(execEngine)) && !"tez".equals(execEngine)) {
+    if (pctx.getContext().getExplain() && !isTezExecEngine) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Sun Oct  5 22:26:43 2014
@@ -52,6 +52,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -183,7 +184,10 @@ public class ReduceSinkMapJoinProc imple
         TezWork tezWork = context.currentTask.getWork();
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
         tezWork.connect(parentWork, myWork, edgeProp);
-        
+        if (edgeType == EdgeType.CUSTOM_EDGE) {
+          tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
+        }
+
         ReduceSinkOperator r = null;
         if (parentRS.getConf().getOutputName() != null) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");

Modified: hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1629544&r1=1629543&r2=1629544&view=diff
==============================================================================
--- hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/spark-new/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Sun Oct  5 22:26:43 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,13 +55,25 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
+import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
+import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -73,9 +85,11 @@ public class SimpleFetchOptimizer implem
 
   private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
 
+  @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
-    if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
+    if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
+        && topOps.size() == 1) {
       // no join, no groupby, no distinct, no lateral view, no subq,
       // no CTAS or insert, not analyze command, and single sourced.
       String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -144,7 +158,7 @@ public class SimpleFetchOptimizer implem
   // for non-aggressive mode (minimal)
   // 1. samping is not allowed
   // 2. for partitioned table, all filters should be targeted to partition column
-  // 3. SelectOperator should be select star
+  // 3. SelectOperator should use only simple cast/column access
   private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
       TableScanOperator ts) throws HiveException {
     SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -156,7 +170,7 @@ public class SimpleFetchOptimizer implem
       return null;
     }
 
-    Table table = qb.getMetaData().getAliasToTable().get(alias);
+    Table table = pctx.getTopToTable().get(ts);
     if (table == null) {
       return null;
     }
@@ -181,34 +195,71 @@ public class SimpleFetchOptimizer implem
     return null;
   }
 
-  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
+  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
       boolean bypassFilter) {
     if (ts.getChildOperators().size() != 1) {
       return null;
     }
     Operator<?> op = ts.getChildOperators().get(0);
     for (; ; op = op.getChildOperators().get(0)) {
-      if (aggresive) {
-        if (!(op instanceof LimitOperator || op instanceof FilterOperator
-            || op instanceof SelectOperator)) {
+      if (op instanceof SelectOperator) {
+        if (!aggressive) {
+          if (!checkExpressions((SelectOperator) op)) {
+            break;
+          }
+        }
+        continue;
+      }
+
+      if (aggressive) {
+        if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
           break;
         }
-      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
-          || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
+      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
         break;
       }
+
       if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
         return null;
       }
     }
+
     if (op instanceof FileSinkOperator) {
       fetch.scanOp = ts;
       fetch.fileSink = op;
       return fetch;
     }
+
     return null;
   }
 
+  private boolean checkExpressions(SelectOperator op) {
+    SelectDesc desc = op.getConf();
+    for (ExprNodeDesc expr : desc.getColList()) {
+      if (!checkExpression(expr)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private boolean checkExpression(ExprNodeDesc expr) {
+    if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
+      return true;
+    }
+
+    if (expr instanceof ExprNodeGenericFuncDesc) {
+      GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
+      if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
+          || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
+          || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
+          || udf instanceof GenericUDFToVarchar) {
+        return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
+      }
+    }
+    return false;
+  }
+
   private class FetchData {
 
     private final ReadEntity parent;
@@ -240,7 +291,7 @@ public class SimpleFetchOptimizer implem
       this.splitSample = splitSample;
       this.onlyPruningFilter = bypassFilter;
     }
-    
+
     /*
      * all filters were executed during partition pruning
      */
@@ -251,7 +302,7 @@ public class SimpleFetchOptimizer implem
     private FetchWork convertToWork() throws HiveException {
       inputs.clear();
       if (!table.isPartitioned()) {
-        inputs.add(new ReadEntity(table, parent));
+        inputs.add(new ReadEntity(table, parent, parent == null));
         FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
         PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
         work.setSplitSample(splitSample);
@@ -261,12 +312,12 @@ public class SimpleFetchOptimizer implem
       List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
 
       for (Partition partition : partsList.getNotDeniedPartns()) {
-        inputs.add(new ReadEntity(partition, parent));
+        inputs.add(new ReadEntity(partition, parent, parent == null));
         listP.add(partition.getDataLocation());
         partP.add(Utilities.getPartitionDesc(partition));
       }
       Table sourceTable = partsList.getSourceTable();
-      inputs.add(new ReadEntity(sourceTable, parent));
+      inputs.add(new ReadEntity(sourceTable, parent, parent == null));
       TableDesc table = Utilities.getTableDesc(sourceTable);
       FetchWork work = new FetchWork(listP, partP, table);
       if (!work.getPartDesc().isEmpty()) {