You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/10/06 05:44:26 UTC

svn commit: r1629562 [6/38] - in /hive/branches/spark: ./ accumulo-handler/ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ext/ common/ common/src/java/org/apache/hadoop/hive/conf/ common/src/test/org/apache/hadoop/hive/common/type/ contrib/src...

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java Mon Oct  6 03:44:13 2014
@@ -16,12 +16,19 @@ package org.apache.hadoop.hive.ql.io.par
 import java.math.BigDecimal;
 import java.sql.Timestamp;
 import java.util.ArrayList;
+import java.util.List;
 
+import org.apache.hadoop.hive.common.type.HiveChar;
+import org.apache.hadoop.hive.common.type.HiveVarchar;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTime;
 import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveCharWritable;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.HiveVarcharWritable;
 import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.FloatWritable;
@@ -145,6 +152,32 @@ public enum ETypeConverter {
         }
       };
     }
+  },
+  ECHAR_CONVERTER(HiveCharWritable.class) {
+    @Override
+    Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
+      return new BinaryConverter<HiveCharWritable>(type, parent, index) {
+        @Override
+        protected HiveCharWritable convert(Binary binary) {
+          HiveChar hiveChar = new HiveChar();
+          hiveChar.setValue(binary.toStringUsingUTF8());
+          return new HiveCharWritable(hiveChar);
+        }
+      };
+    }
+  },
+  EVARCHAR_CONVERTER(HiveVarcharWritable.class) {
+    @Override
+    Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent) {
+      return new BinaryConverter<HiveVarcharWritable>(type, parent, index) {
+        @Override
+        protected HiveVarcharWritable convert(Binary binary) {
+          HiveVarchar hiveVarchar = new HiveVarchar();
+          hiveVarchar.setValue(binary.toStringUsingUTF8());
+          return new HiveVarcharWritable(hiveVarchar);
+        }
+      };
+    }
   };
 
   final Class<?> _type;
@@ -160,7 +193,7 @@ public enum ETypeConverter {
   abstract Converter getConverter(final PrimitiveType type, final int index, final HiveGroupConverter parent);
 
   public static Converter getNewConverter(final PrimitiveType type, final int index,
-      final HiveGroupConverter parent) {
+      final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
     if (type.isPrimitive() && (type.asPrimitiveType().getPrimitiveTypeName().equals(PrimitiveType.PrimitiveTypeName.INT96))) {
       //TODO- cleanup once parquet support Timestamp type annotation.
       return ETypeConverter.ETIMESTAMP_CONVERTER.getConverter(type, index, parent);
@@ -168,7 +201,15 @@ public enum ETypeConverter {
     if (OriginalType.DECIMAL == type.getOriginalType()) {
       return EDECIMAL_CONVERTER.getConverter(type, index, parent);
     } else if (OriginalType.UTF8 == type.getOriginalType()) {
-      return ESTRING_CONVERTER.getConverter(type, index, parent);
+      if (hiveSchemaTypeInfos.get(index).getTypeName()
+          .startsWith(serdeConstants.CHAR_TYPE_NAME)) {
+        return ECHAR_CONVERTER.getConverter(type, index, parent);
+      } else if (hiveSchemaTypeInfos.get(index).getTypeName()
+          .startsWith(serdeConstants.VARCHAR_TYPE_NAME)) {
+        return EVARCHAR_CONVERTER.getConverter(type, index, parent);
+      } else if (type.isPrimitive()) {
+        return ESTRING_CONVERTER.getConverter(type, index, parent);
+      }
     }
 
     Class<?> javaType = type.getPrimitiveTypeName().javaType;

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java Mon Oct  6 03:44:13 2014
@@ -13,6 +13,9 @@
  */
 package org.apache.hadoop.hive.ql.io.parquet.convert;
 
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.io.Writable;
 
 import parquet.io.api.Converter;
@@ -23,17 +26,20 @@ import parquet.schema.Type.Repetition;
 public abstract class HiveGroupConverter extends GroupConverter {
 
   protected static Converter getConverterFromDescription(final Type type, final int index,
-      final HiveGroupConverter parent) {
+      final HiveGroupConverter parent, List<TypeInfo> hiveSchemaTypeInfos) {
     if (type == null) {
       return null;
     }
     if (type.isPrimitive()) {
-      return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent);
+      return ETypeConverter.getNewConverter(type.asPrimitiveType(), index, parent,
+          hiveSchemaTypeInfos);
     } else {
       if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
-        return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
+        return new ArrayWritableGroupConverter(type.asGroupType(), parent, index,
+            hiveSchemaTypeInfos);
       } else {
-        return new DataWritableGroupConverter(type.asGroupType(), parent, index);
+        return new DataWritableGroupConverter(type.asGroupType(), parent, index,
+            hiveSchemaTypeInfos);
       }
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java Mon Oct  6 03:44:13 2014
@@ -14,6 +14,7 @@
 package org.apache.hadoop.hive.ql.io.parquet.read;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -23,6 +24,8 @@ import org.apache.hadoop.hive.ql.io.IOCo
 import org.apache.hadoop.hive.ql.io.parquet.convert.DataWritableRecordConverter;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.ArrayWritable;
 import org.apache.hadoop.util.StringUtils;
 
@@ -53,7 +56,7 @@ public class DataWritableReadSupport ext
    * From a string which columns names (including hive column), return a list
    * of string columns
    *
-   * @param columns comma separated list of columns
+   * @param comma separated list of columns
    * @return list with virtual columns removed
    */
   private static List<String> getColumns(final String columns) {
@@ -61,6 +64,27 @@ public class DataWritableReadSupport ext
         removeVirtualColumns(StringUtils.getStringCollection(columns));
   }
 
+  private static List<TypeInfo> getColumnTypes(Configuration configuration) {
+
+    List<String> columnNames;
+    String columnNamesProperty = configuration.get(IOConstants.COLUMNS);
+    if (columnNamesProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNamesProperty.split(","));
+    }
+    List<TypeInfo> columnTypes;
+    String columnTypesProperty = configuration.get(IOConstants.COLUMNS_TYPES);
+    if (columnTypesProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypesProperty);
+    }
+
+    columnTypes = VirtualColumn.removeVirtualColumnTypes(columnNames, columnTypes);
+    return columnTypes;
+  }
+
   /**
    *
    * It creates the readContext for Parquet side with the requested schema during the init phase.
@@ -149,7 +173,8 @@ public class DataWritableReadSupport ext
     }
     final MessageType tableSchema = resolveSchemaAccess(MessageTypeParser.
         parseMessageType(metadata.get(HIVE_SCHEMA_KEY)), fileSchema, configuration);
-    return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema);
+    return new DataWritableRecordConverter(readContext.getRequestedSchema(), tableSchema,
+        getColumnTypes(configuration));
   }
 
   /**
@@ -169,4 +194,4 @@ public class DataWritableReadSupport ext
     }
     return requestedSchema;
   }
-}
\ No newline at end of file
+}

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbLockManager.java Mon Oct  6 03:44:13 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.lockmg
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.*;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.thrift.TException;
@@ -43,10 +42,10 @@ public class DbLockManager implements Hi
   private static final long MAX_SLEEP = 15000;
   private HiveLockManagerCtx context;
   private Set<DbHiveLock> locks;
-  private IMetaStoreClient client;
+  private HiveMetaStoreClient client;
   private long nextSleep = 50;
 
-  DbLockManager(IMetaStoreClient client) {
+  DbLockManager(HiveMetaStoreClient client) {
     locks = new HashSet<DbHiveLock>();
     this.client = client;
   }
@@ -211,8 +210,8 @@ public class DbLockManager implements Hi
   /**
    * Clear the memory of the locks in this object.  This won't clear the locks from the database.
    * It is for use with
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).commitTxn} and
-   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.IMetaStoreClient).rollbackTxn}.
+   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).commitTxn} and
+   * {@link #DbLockManager(org.apache.hadoop.hive.metastore.HiveMetaStoreClient).rollbackTxn}.
    */
   void clearLocalLockRecords() {
     locks.clear();

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Mon Oct  6 03:44:13 2014
@@ -31,8 +31,6 @@ import org.apache.hadoop.hive.ql.QueryPl
 import org.apache.hadoop.hive.ql.hooks.Entity;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.thrift.TException;
 
@@ -48,7 +46,7 @@ public class DbTxnManager extends HiveTx
   static final private Log LOG = LogFactory.getLog(CLASS_NAME);
 
   private DbLockManager lockMgr = null;
-  private IMetaStoreClient client = null;
+  private HiveMetaStoreClient client = null;
   private long txnId = 0;
 
   DbTxnManager() {
@@ -286,7 +284,7 @@ public class DbTxnManager extends HiveTx
   public ValidTxnList getValidTxns() throws LockException {
     init();
     try {
-      return client.getValidTxns(txnId);
+      return client.getValidTxns();
     } catch (TException e) {
       throw new LockException(ErrorMsg.METASTORE_COMMUNICATION_FAILED.getMsg(),
           e);
@@ -313,6 +311,7 @@ public class DbTxnManager extends HiveTx
     try {
       if (txnId > 0) rollbackTxn();
       if (lockMgr != null) lockMgr.close();
+      if (client != null) client.close();
     } catch (Exception e) {
       LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
       + ">, swallowing as there is nothing we can do with it.");
@@ -327,12 +326,10 @@ public class DbTxnManager extends HiveTx
             "methods.");
       }
       try {
-        Hive db = Hive.get(conf);
-        client = db.getMSC();
+        client = new HiveMetaStoreClient(conf);
       } catch (MetaException e) {
-        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
-      } catch (HiveException e) {
-        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(), e);
+        throw new LockException(ErrorMsg.METASTORE_COULD_NOT_INITIATE.getMsg(),
+            e);
       }
     }
   }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Mon Oct  6 03:44:13 2014
@@ -878,23 +878,6 @@ public class Hive {
 
   /**
    * Drops table along with the data in it. If the table doesn't exist then it
-   * is a no-op. If ifPurge option is specified it is passed to the
-   * hdfs command that removes table data from warehouse to make it skip trash.
-   *
-   * @param tableName
-   *          table to drop
-   * @param ifPurge
-   *          completely purge the table (skipping trash) while removing data from warehouse
-   * @throws HiveException
-   *           thrown if the drop fails
-   */
-  public void dropTable(String tableName, boolean ifPurge) throws HiveException {
-    String[] names = Utilities.getDbTableName(tableName);
-    dropTable(names[0], names[1], true, true, ifPurge);
-  }
-
-  /**
-   * Drops table along with the data in it. If the table doesn't exist then it
    * is a no-op
    *
    * @param tableName
@@ -903,7 +886,8 @@ public class Hive {
    *           thrown if the drop fails
    */
   public void dropTable(String tableName) throws HiveException {
-    dropTable(tableName, false);
+    String[] names = Utilities.getDbTableName(tableName);
+    dropTable(names[0], names[1], true, true);
   }
 
   /**
@@ -918,7 +902,7 @@ public class Hive {
    *           thrown if the drop fails
    */
   public void dropTable(String dbName, String tableName) throws HiveException {
-    dropTable(dbName, tableName, true, true, false);
+    dropTable(dbName, tableName, true, true);
   }
 
   /**
@@ -929,31 +913,14 @@ public class Hive {
    * @param deleteData
    *          deletes the underlying data along with metadata
    * @param ignoreUnknownTab
-   *          an exception is thrown if this is false and the table doesn't exist
+   *          an exception if thrown if this is falser and table doesn't exist
    * @throws HiveException
    */
   public void dropTable(String dbName, String tableName, boolean deleteData,
       boolean ignoreUnknownTab) throws HiveException {
-    dropTable(dbName, tableName, deleteData, ignoreUnknownTab, false);
-  }
 
-  /**
-   * Drops the table.
-   *
-   * @param dbName
-   * @param tableName
-   * @param deleteData
-   *          deletes the underlying data along with metadata
-   * @param ignoreUnknownTab
-   *          an exception is thrown if this is false and the table doesn't exist
-   * @param ifPurge
-   *          completely purge the table skipping trash while removing data from warehouse
-   * @throws HiveException
-   */
-  public void dropTable(String dbName, String tableName, boolean deleteData,
-      boolean ignoreUnknownTab, boolean ifPurge) throws HiveException {
     try {
-      getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab, ifPurge);
+      getMSC().dropTable(dbName, tableName, deleteData, ignoreUnknownTab);
     } catch (NoSuchObjectException e) {
       if (!ignoreUnknownTab) {
         throw new HiveException(e);
@@ -1237,15 +1204,6 @@ public class Hive {
     return getDatabase(currentDb);
   }
 
-  public void loadPartition(Path loadPath, String tableName,
-      Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
-      boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
-      boolean isSrcLocal, boolean isAcid) throws HiveException {
-    Table tbl = getTable(tableName);
-    loadPartition(loadPath, tbl, partSpec, replace, holdDDLTime, inheritTableSpecs,
-        isSkewedStoreAsSubdir, isSrcLocal, isAcid);
-  }
-
   /**
    * Load a directory into a Hive Table Partition - Alters existing content of
    * the partition with the contents of loadPath. - If the partition does not
@@ -1254,7 +1212,7 @@ public class Hive {
    *
    * @param loadPath
    *          Directory containing files to load into Table
-   * @param  tbl
+   * @param tableName
    *          name of table to be loaded.
    * @param partSpec
    *          defines which partition needs to be loaded
@@ -1267,12 +1225,12 @@ public class Hive {
    * @param isSrcLocal
    *          If the source directory is LOCAL
    */
-  public Partition loadPartition(Path loadPath, Table tbl,
+  public void loadPartition(Path loadPath, String tableName,
       Map<String, String> partSpec, boolean replace, boolean holdDDLTime,
       boolean inheritTableSpecs, boolean isSkewedStoreAsSubdir,
       boolean isSrcLocal, boolean isAcid) throws HiveException {
+    Table tbl = getTable(tableName);
     Path tblDataLocationPath =  tbl.getDataLocation();
-    Partition newTPart = null;
     try {
       /**
        * Move files before creating the partition since down stream processes
@@ -1321,10 +1279,10 @@ public class Hive {
         Hive.copyFiles(conf, loadPath, newPartPath, fs, isSrcLocal, isAcid);
       }
 
-      boolean forceCreate = (!holdDDLTime) ? true : false;
-      newTPart = getPartition(tbl, partSpec, forceCreate, newPartPath.toString(), inheritTableSpecs);
       // recreate the partition if it existed before
       if (!holdDDLTime) {
+        Partition newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(),
+            inheritTableSpecs);
         if (isSkewedStoreAsSubdir) {
           org.apache.hadoop.hive.metastore.api.Partition newCreatedTpart = newTPart.getTPartition();
           SkewedInfo skewedInfo = newCreatedTpart.getSd().getSkewedInfo();
@@ -1334,9 +1292,9 @@ public class Hive {
           /* Add list bucketing location mappings. */
           skewedInfo.setSkewedColValueLocationMaps(skewedColValueLocationMaps);
           newCreatedTpart.getSd().setSkewedInfo(skewedInfo);
-          alterPartition(tbl.getDbName(), tbl.getTableName(), new Partition(tbl, newCreatedTpart));
+          alterPartition(tbl.getTableName(), new Partition(tbl, newCreatedTpart));
           newTPart = getPartition(tbl, partSpec, true, newPartPath.toString(), inheritTableSpecs);
-          return new Partition(tbl, newCreatedTpart);
+          newCreatedTpart = newTPart.getTPartition();
         }
       }
     } catch (IOException e) {
@@ -1349,7 +1307,7 @@ public class Hive {
       LOG.error(StringUtils.stringifyException(e));
       throw new HiveException(e);
     }
-    return newTPart;
+
   }
 
   /**
@@ -1445,18 +1403,18 @@ private void constructOneLBLocationMap(F
    * @param replace
    * @param numDP number of dynamic partitions
    * @param holdDDLTime
-   * @return partition map details (PartitionSpec and Partition)
+   * @return a list of strings with the dynamic partition paths
    * @throws HiveException
    */
-  public Map<Map<String, String>, Partition> loadDynamicPartitions(Path loadPath,
+  public ArrayList<LinkedHashMap<String, String>> loadDynamicPartitions(Path loadPath,
       String tableName, Map<String, String> partSpec, boolean replace,
       int numDP, boolean holdDDLTime, boolean listBucketingEnabled, boolean isAcid)
       throws HiveException {
 
     Set<Path> validPartitions = new HashSet<Path>();
     try {
-      Map<Map<String, String>, Partition> partitionsMap = new
-          LinkedHashMap<Map<String, String>, Partition>();
+      ArrayList<LinkedHashMap<String, String>> fullPartSpecs =
+        new ArrayList<LinkedHashMap<String, String>>();
 
       FileSystem fs = loadPath.getFileSystem(conf);
       FileStatus[] leafStatus = HiveStatsUtils.getFileStatusRecurse(loadPath, numDP+1, fs);
@@ -1490,7 +1448,6 @@ private void constructOneLBLocationMap(F
             + " to at least " + validPartitions.size() + '.');
       }
 
-      Table tbl = getTable(tableName);
       // for each dynamically created DP directory, construct a full partition spec
       // and load the partition based on that
       Iterator<Path> iter = validPartitions.iterator();
@@ -1503,12 +1460,14 @@ private void constructOneLBLocationMap(F
         // generate a full partition specification
         LinkedHashMap<String, String> fullPartSpec = new LinkedHashMap<String, String>(partSpec);
         Warehouse.makeSpecFromName(fullPartSpec, partPath);
-        Partition newPartition = loadPartition(partPath, tbl, fullPartSpec, replace,
-            holdDDLTime, true, listBucketingEnabled, false, isAcid);
-        partitionsMap.put(fullPartSpec, newPartition);
+        fullPartSpecs.add(fullPartSpec);
+
+        // finally load the partition -- move the file to the final table address
+        loadPartition(partPath, tableName, fullPartSpec, replace, holdDDLTime, true,
+            listBucketingEnabled, false, isAcid);
         LOG.info("New loading path = " + partPath + " with partSpec " + fullPartSpec);
       }
-      return partitionsMap;
+      return fullPartSpecs;
     } catch (IOException e) {
       throw new HiveException(e);
     }
@@ -1777,7 +1736,6 @@ private void constructOneLBLocationMap(F
   public List<Partition> dropPartitions(String dbName, String tblName,
       List<DropTableDesc.PartSpec> partSpecs,  boolean deleteData, boolean ignoreProtection,
       boolean ifExists) throws HiveException {
-    //TODO: add support for ifPurge
     try {
       Table tbl = getTable(dbName, tblName);
       List<ObjectPair<Integer, byte[]>> partExprs =

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/metadata/SessionHiveMetaStoreClient.java Mon Oct  6 03:44:13 2014
@@ -451,11 +451,7 @@ public class SessionHiveMetaStoreClient 
     // Delete table data
     if (deleteData && !MetaStoreUtils.isExternalTable(table)) {
       try {
-        boolean ifPurge = false;
-        if (envContext != null){
-          ifPurge = Boolean.parseBoolean(envContext.getProperties().get("ifPurge"));
-        }
-        getWh().deleteDir(tablePath, true, ifPurge);
+        getWh().deleteDir(tablePath, true);
       } catch (Exception err) {
         LOG.error("Failed to delete temp table directory: " + tablePath, err);
         // Forgive error

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConstantPropagateProcFactory.java Mon Oct  6 03:44:13 2014
@@ -670,15 +670,10 @@ public final class ConstantPropagateProc
       cppCtx.getOpToConstantExprs().put(op, constants);
       foldOperator(op, cppCtx);
       List<ExprNodeDesc> colList = op.getConf().getColList();
-      List<String> columnNames = op.getConf().getOutputColumnNames();
-      Map<String, ExprNodeDesc> columnExprMap = op.getColumnExprMap();
       if (colList != null) {
         for (int i = 0; i < colList.size(); i++) {
           ExprNodeDesc newCol = foldExpr(colList.get(i), constants, cppCtx, op, 0, false);
           colList.set(i, newCol);
-          if (columnExprMap != null) {
-            columnExprMap.put(columnNames.get(i), newCol);
-          }
         }
         LOG.debug("New column list:(" + StringUtils.join(colList, " ") + ")");
       }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java Mon Oct  6 03:44:13 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -30,17 +29,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.AppMasterEventOperator;
-import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
-import org.apache.hadoop.hive.ql.exec.DummyStoreOperator;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.exec.MuxOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
-import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
-import org.apache.hadoop.hive.ql.exec.TezDummyStoreOperator;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.lib.NodeProcessor;
 import org.apache.hadoop.hive.ql.lib.NodeProcessorCtx;
@@ -48,16 +42,12 @@ import org.apache.hadoop.hive.ql.parse.O
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.DynamicPruningEventDesc;
-import org.apache.hadoop.hive.ql.plan.CommonMergeJoinDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.JoinCondDesc;
-import org.apache.hadoop.hive.ql.plan.JoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.OpTraits;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.Statistics;
-import org.apache.hadoop.util.ReflectionUtils;
 
 /**
  * ConvertJoinMapJoin is an optimization that replaces a common join
@@ -70,46 +60,39 @@ public class ConvertJoinMapJoin implemen
 
   static final private Log LOG = LogFactory.getLog(ConvertJoinMapJoin.class.getName());
 
-  @SuppressWarnings("unchecked")
   @Override
-  /*
-   * (non-Javadoc) we should ideally not modify the tree we traverse. However,
-   * since we need to walk the tree at any time when we modify the operator, we
-   * might as well do it here.
-   */
-  public Object
-      process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx, Object... nodeOutputs)
-          throws SemanticException {
+    /*
+     * (non-Javadoc)
+     * we should ideally not modify the tree we traverse.
+     * However, since we need to walk the tree at any time when we modify the
+     * operator, we might as well do it here.
+     */
+    public Object process(Node nd, Stack<Node> stack,
+        NodeProcessorCtx procCtx, Object... nodeOutputs)
+    throws SemanticException {
 
     OptimizeTezProcContext context = (OptimizeTezProcContext) procCtx;
 
-    JoinOperator joinOp = (JoinOperator) nd;
-
-    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)
-        && !(context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN))) {
-      // we are just converting to a common merge join operator. The shuffle
-      // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
+    if (!context.conf.getBoolVar(HiveConf.ConfVars.HIVECONVERTJOIN)) {
       return null;
     }
 
-    // if we have traits, and table info is present in the traits, we know the
+    JoinOperator joinOp = (JoinOperator) nd;
+    // if we have traits, and table info is present in the traits, we know the 
     // exact number of buckets. Else choose the largest number of estimated
     // reducers from the parent operators.
     int numBuckets = -1;
     int estimatedBuckets = -1;
-    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
     if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
       for (Operator<? extends OperatorDesc>parentOp : joinOp.getParentOperators()) {
         if (parentOp.getOpTraits().getNumBuckets() > 0) {
-          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ?
-              parentOp.getOpTraits().getNumBuckets() : numBuckets;
+          numBuckets = (numBuckets < parentOp.getOpTraits().getNumBuckets()) ? 
+              parentOp.getOpTraits().getNumBuckets() : numBuckets; 
         }
 
         if (parentOp instanceof ReduceSinkOperator) {
           ReduceSinkOperator rs = (ReduceSinkOperator)parentOp;
-          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ?
+          estimatedBuckets = (estimatedBuckets < rs.getConf().getNumReducers()) ? 
               rs.getConf().getNumReducers() : estimatedBuckets;
         }
       }
@@ -124,80 +107,29 @@ public class ConvertJoinMapJoin implemen
       numBuckets = 1;
     }
     LOG.info("Estimated number of buckets " + numBuckets);
-    int mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, numBuckets);
+    int mapJoinConversionPos = mapJoinConversionPos(joinOp, context, numBuckets);
     if (mapJoinConversionPos < 0) {
-      // we cannot convert to bucket map join, we cannot convert to
-      // map join either based on the size. Check if we can convert to SMB join.
-      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN) == false) {
-        convertJoinSMBJoin(joinOp, context, 0, 0, false, false);
-        return null;
-      }
-      Class<? extends BigTableSelectorForAutoSMJ> bigTableMatcherClass = null;
-      try {
-        bigTableMatcherClass =
-            (Class<? extends BigTableSelectorForAutoSMJ>) (Class.forName(HiveConf.getVar(
-                context.parseContext.getConf(),
-                HiveConf.ConfVars.HIVE_AUTO_SORTMERGE_JOIN_BIGTABLE_SELECTOR)));
-      } catch (ClassNotFoundException e) {
-        throw new SemanticException(e.getMessage());
-      }
-
-      BigTableSelectorForAutoSMJ bigTableMatcher =
-          ReflectionUtils.newInstance(bigTableMatcherClass, null);
-      JoinDesc joinDesc = joinOp.getConf();
-      JoinCondDesc[] joinCondns = joinDesc.getConds();
-      Set<Integer> joinCandidates = MapJoinProcessor.getBigTableCandidates(joinCondns);
-      if (joinCandidates.isEmpty()) {
-        // This is a full outer join. This can never be a map-join
-        // of any type. So return false.
-        return false;
-      }
-      mapJoinConversionPos =
-          bigTableMatcher.getBigTablePosition(context.parseContext, joinOp, joinCandidates);
-      if (mapJoinConversionPos < 0) {
-        // contains aliases from sub-query
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-        return null;
-      }
-
-      if (checkConvertJoinSMBJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
-        convertJoinSMBJoin(joinOp, context, mapJoinConversionPos,
-            tezBucketJoinProcCtx.getNumBuckets(), tezBucketJoinProcCtx.isSubQuery(), true);
-      } else {
-        // we are just converting to a common merge join operator. The shuffle
-        // join in map-reduce case.
-        int pos = 0; // it doesn't matter which position we use in this case.
-        convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
-      }
+      // we cannot convert to bucket map join, we cannot convert to 
+      // map join either based on the size
       return null;
     }
 
-    if (numBuckets > 1) {
-      if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
-        if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos, tezBucketJoinProcCtx)) {
-          return null;
-        }
+    if (context.conf.getBoolVar(HiveConf.ConfVars.HIVE_CONVERT_JOIN_BUCKET_MAPJOIN_TEZ)) {
+      if (convertJoinBucketMapJoin(joinOp, context, mapJoinConversionPos)) {
+        return null;
       }
     }
 
     LOG.info("Convert to non-bucketed map join");
     // check if we can convert to map join no bucket scaling.
-    mapJoinConversionPos = getMapJoinConversionPos(joinOp, context, 1);
+    mapJoinConversionPos = mapJoinConversionPos(joinOp, context, 1);
     if (mapJoinConversionPos < 0) {
-      // we are just converting to a common merge join operator. The shuffle
-      // join in map-reduce case.
-      int pos = 0; // it doesn't matter which position we use in this case.
-      convertJoinSMBJoin(joinOp, context, pos, 0, false, false);
       return null;
     }
 
     MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, mapJoinConversionPos);
     // map join operator by default has no bucket cols
-    mapJoinOp.setOpTraits(new OpTraits(null, -1, null));
-    mapJoinOp.setStatistics(joinOp.getStatistics());
+    mapJoinOp.setOpTraits(new OpTraits(null, -1));
     // propagate this change till the next RS
     for (Operator<? extends OperatorDesc> childOp : mapJoinOp.getChildOperators()) {
       setAllChildrenTraitsToNull(childOp);
@@ -206,107 +138,11 @@ public class ConvertJoinMapJoin implemen
     return null;
   }
 
-  // replaces the join operator with a new CommonJoinOperator, removes the
-  // parent reduce sinks
-  private void convertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
-      int mapJoinConversionPos, int numBuckets, boolean isSubQuery, boolean adjustParentsChildren)
-      throws SemanticException {
-    ParseContext parseContext = context.parseContext;
-    MapJoinDesc mapJoinDesc = null;
-    if (adjustParentsChildren) {
-        mapJoinDesc = MapJoinProcessor.getMapJoinDesc(context.conf, parseContext.getOpParseCtx(),
-            joinOp, parseContext.getJoinContext().get(joinOp), mapJoinConversionPos, true);
-    } else {
-      JoinDesc joinDesc = joinOp.getConf();
-      // retain the original join desc in the map join.
-      mapJoinDesc =
-          new MapJoinDesc(null, null, joinDesc.getExprs(), null, null,
-              joinDesc.getOutputColumnNames(), mapJoinConversionPos, joinDesc.getConds(),
-              joinDesc.getFilters(), joinDesc.getNoOuterJoin(), null);
-    }
-
-    @SuppressWarnings("unchecked")
-    CommonMergeJoinOperator mergeJoinOp =
-        (CommonMergeJoinOperator) OperatorFactory.get(new CommonMergeJoinDesc(numBuckets,
-            isSubQuery, mapJoinConversionPos, mapJoinDesc));
-    OpTraits opTraits =
-        new OpTraits(joinOp.getOpTraits().getBucketColNames(), numBuckets, joinOp.getOpTraits()
-            .getSortCols());
-    mergeJoinOp.setOpTraits(opTraits);
-    mergeJoinOp.setStatistics(joinOp.getStatistics());
-
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
-      int pos = parentOp.getChildOperators().indexOf(joinOp);
-      parentOp.getChildOperators().remove(pos);
-      parentOp.getChildOperators().add(pos, mergeJoinOp);
-    }
-
-    for (Operator<? extends OperatorDesc> childOp : joinOp.getChildOperators()) {
-      int pos = childOp.getParentOperators().indexOf(joinOp);
-      childOp.getParentOperators().remove(pos);
-      childOp.getParentOperators().add(pos, mergeJoinOp);
-    }
-
-    List<Operator<? extends OperatorDesc>> childOperators = mergeJoinOp.getChildOperators();
-    if (childOperators == null) {
-      childOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-      mergeJoinOp.setChildOperators(childOperators);
-    }
-
-    List<Operator<? extends OperatorDesc>> parentOperators = mergeJoinOp.getParentOperators();
-    if (parentOperators == null) {
-      parentOperators = new ArrayList<Operator<? extends OperatorDesc>>();
-      mergeJoinOp.setParentOperators(parentOperators);
-    }
-
-    childOperators.clear();
-    parentOperators.clear();
-    childOperators.addAll(joinOp.getChildOperators());
-    parentOperators.addAll(joinOp.getParentOperators());
-    mergeJoinOp.getConf().setGenJoinKeys(false);
-
-    if (adjustParentsChildren) {
-      mergeJoinOp.getConf().setGenJoinKeys(true);
-      List<Operator<? extends OperatorDesc>> newParentOpList =
-          new ArrayList<Operator<? extends OperatorDesc>>();
-      for (Operator<? extends OperatorDesc> parentOp : mergeJoinOp.getParentOperators()) {
-        for (Operator<? extends OperatorDesc> grandParentOp : parentOp.getParentOperators()) {
-          grandParentOp.getChildOperators().remove(parentOp);
-          grandParentOp.getChildOperators().add(mergeJoinOp);
-          newParentOpList.add(grandParentOp);
-        }
-      }
-      mergeJoinOp.getParentOperators().clear();
-      mergeJoinOp.getParentOperators().addAll(newParentOpList);
-      List<Operator<? extends OperatorDesc>> parentOps =
-          new ArrayList<Operator<? extends OperatorDesc>>(mergeJoinOp.getParentOperators());
-      for (Operator<? extends OperatorDesc> parentOp : parentOps) {
-        int parentIndex = mergeJoinOp.getParentOperators().indexOf(parentOp);
-        if (parentIndex == mapJoinConversionPos) {
-          continue;
-        }
-
-        // insert the dummy store operator here
-        DummyStoreOperator dummyStoreOp = new TezDummyStoreOperator();
-        dummyStoreOp.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>());
-        dummyStoreOp.setChildOperators(new ArrayList<Operator<? extends OperatorDesc>>());
-        dummyStoreOp.getChildOperators().add(mergeJoinOp);
-        int index = parentOp.getChildOperators().indexOf(mergeJoinOp);
-        parentOp.getChildOperators().remove(index);
-        parentOp.getChildOperators().add(index, dummyStoreOp);
-        dummyStoreOp.getParentOperators().add(parentOp);
-        mergeJoinOp.getParentOperators().remove(parentIndex);
-        mergeJoinOp.getParentOperators().add(parentIndex, dummyStoreOp);
-      }
-    }
-    mergeJoinOp.cloneOriginalParentsList(mergeJoinOp.getParentOperators());
-  }
-
   private void setAllChildrenTraitsToNull(Operator<? extends OperatorDesc> currentOp) {
     if (currentOp instanceof ReduceSinkOperator) {
       return;
     }
-    currentOp.setOpTraits(new OpTraits(null, -1, null));
+    currentOp.setOpTraits(new OpTraits(null, -1));
     for (Operator<? extends OperatorDesc> childOp : currentOp.getChildOperators()) {
       if ((childOp instanceof ReduceSinkOperator) || (childOp instanceof GroupByOperator)) {
         break;
@@ -315,26 +151,28 @@ public class ConvertJoinMapJoin implemen
     }
   }
 
-  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
-      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
+  private boolean convertJoinBucketMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
+      int bigTablePosition) throws SemanticException {
+
+    TezBucketJoinProcCtx tezBucketJoinProcCtx = new TezBucketJoinProcCtx(context.conf);
 
     if (!checkConvertJoinBucketMapJoin(joinOp, context, bigTablePosition, tezBucketJoinProcCtx)) {
       LOG.info("Check conversion to bucket map join failed.");
       return false;
     }
 
-    MapJoinOperator mapJoinOp = convertJoinMapJoin(joinOp, context, bigTablePosition);
+    MapJoinOperator mapJoinOp = 
+      convertJoinMapJoin(joinOp, context, bigTablePosition);
     MapJoinDesc joinDesc = mapJoinOp.getConf();
     joinDesc.setBucketMapJoin(true);
 
     // we can set the traits for this join operator
     OpTraits opTraits = new OpTraits(joinOp.getOpTraits().getBucketColNames(),
-        tezBucketJoinProcCtx.getNumBuckets(), null);
+        tezBucketJoinProcCtx.getNumBuckets());
     mapJoinOp.setOpTraits(opTraits);
-    mapJoinOp.setStatistics(joinOp.getStatistics());
     setNumberOfBucketsOnChildren(mapJoinOp);
 
-    // Once the conversion is done, we can set the partitioner to bucket cols on the small table
+    // Once the conversion is done, we can set the partitioner to bucket cols on the small table    
     Map<String, Integer> bigTableBucketNumMapping = new HashMap<String, Integer>();
     bigTableBucketNumMapping.put(joinDesc.getBigTableAlias(), tezBucketJoinProcCtx.getNumBuckets());
     joinDesc.setBigTableBucketNumMapping(bigTableBucketNumMapping);
@@ -344,54 +182,6 @@ public class ConvertJoinMapJoin implemen
     return true;
   }
 
-  /*
-   * This method tries to convert a join to an SMB. This is done based on
-   * traits. If the sorted by columns are the same as the join columns then, we
-   * can convert the join to an SMB. Otherwise retain the bucket map join as it
-   * is still more efficient than a regular join.
-   */
-  private boolean checkConvertJoinSMBJoin(JoinOperator joinOp, OptimizeTezProcContext context,
-      int bigTablePosition, TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
-
-    ReduceSinkOperator bigTableRS =
-        (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
-    int numBuckets = bigTableRS.getParentOperators().get(0).getOpTraits()
-            .getNumBuckets();
-
-    // the sort and bucket cols have to match on both sides for this
-    // transformation of the join operation
-    for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
-      if (!(parentOp instanceof ReduceSinkOperator)) {
-        // could be mux/demux operators. Currently not supported
-        LOG.info("Found correlation optimizer operators. Cannot convert to SMB at this time.");
-        return false;
-      }
-      ReduceSinkOperator rsOp = (ReduceSinkOperator) parentOp;
-      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getSortCols(), rsOp
-          .getOpTraits().getSortCols(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx) == false) {
-        LOG.info("We cannot convert to SMB because the sort column names do not match.");
-        return false;
-      }
-
-      if (checkColEquality(rsOp.getParentOperators().get(0).getOpTraits().getBucketColNames(), rsOp
-          .getOpTraits().getBucketColNames(), rsOp.getColumnExprMap(), tezBucketJoinProcCtx)
-          == false) {
-        LOG.info("We cannot convert to SMB because bucket column names do not match.");
-        return false;
-      }
-    }
-
-    boolean isSubQuery = false;
-    if (numBuckets < 0) {
-      isSubQuery = true;
-      numBuckets = bigTableRS.getConf().getNumReducers();
-    }
-    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
-    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
-    LOG.info("We can convert the join to an SMB join.");
-    return true;
-  }
-
   private void setNumberOfBucketsOnChildren(Operator<? extends OperatorDesc> currentOp) {
     int numBuckets = currentOp.getOpTraits().getNumBuckets();
     for (Operator<? extends OperatorDesc>op : currentOp.getChildOperators()) {
@@ -403,13 +193,15 @@ public class ConvertJoinMapJoin implemen
   }
 
   /*
-   * If the parent reduce sink of the big table side has the same emit key cols
-   * as its parent, we can create a bucket map join eliminating the reduce sink.
+   *  We perform the following checks to see if we can convert to a bucket map join
+   *  1. If the parent reduce sink of the big table side has the same emit key cols as 
+   *  its parent, we can create a bucket map join eliminating the reduce sink.
+   *  2. If we have the table information, we can check the same way as in Mapreduce to 
+   *  determine if we can perform a Bucket Map Join.
    */
-  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp,
-      OptimizeTezProcContext context, int bigTablePosition,
-      TezBucketJoinProcCtx tezBucketJoinProcCtx)
-  throws SemanticException {
+  private boolean checkConvertJoinBucketMapJoin(JoinOperator joinOp, 
+      OptimizeTezProcContext context, int bigTablePosition, 
+      TezBucketJoinProcCtx tezBucketJoinProcCtx) throws SemanticException {
     // bail on mux-operator because mux operator masks the emit keys of the
     // constituent reduce sinks
     if (!(joinOp.getParentOperators().get(0) instanceof ReduceSinkOperator)) {
@@ -419,41 +211,14 @@ public class ConvertJoinMapJoin implemen
     }
 
     ReduceSinkOperator rs = (ReduceSinkOperator) joinOp.getParentOperators().get(bigTablePosition);
-    List<List<String>> parentColNames = rs.getOpTraits().getBucketColNames();
-    Operator<? extends OperatorDesc> parentOfParent = rs.getParentOperators().get(0);
-    List<List<String>> grandParentColNames = parentOfParent.getOpTraits().getBucketColNames();
-    int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
-    // all keys matched.
-    if (checkColEquality(grandParentColNames, parentColNames, rs.getColumnExprMap(),
-        tezBucketJoinProcCtx) == false) {
-      LOG.info("No info available to check for bucket map join. Cannot convert");
-      return false;
-    }
-
     /*
      * this is the case when the big table is a sub-query and is probably
-     * already bucketed by the join column in say a group by operation
+     * already bucketed by the join column in say a group by operation 
      */
-    boolean isSubQuery = false;
-    if (numBuckets < 0) {
-      isSubQuery = true;
-      numBuckets = rs.getConf().getNumReducers();
-    }
-    tezBucketJoinProcCtx.setNumBuckets(numBuckets);
-    tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
-    return true;
-  }
-
-  private boolean checkColEquality(List<List<String>> grandParentColNames,
-      List<List<String>> parentColNames, Map<String, ExprNodeDesc> colExprMap,
-      TezBucketJoinProcCtx tezBucketJoinProcCtx) {
-
-    if ((grandParentColNames == null) || (parentColNames == null)) {
-      return false;
-    }
-
-    if ((parentColNames != null) && (parentColNames.isEmpty() == false)) {
-      for (List<String> listBucketCols : grandParentColNames) {
+    List<List<String>> colNames = rs.getParentOperators().get(0).getOpTraits().getBucketColNames();
+    if ((colNames != null) && (colNames.isEmpty() == false)) {
+      Operator<? extends OperatorDesc>parentOfParent = rs.getParentOperators().get(0);
+      for (List<String>listBucketCols : parentOfParent.getOpTraits().getBucketColNames()) {
         // can happen if this operator does not carry forward the previous bucketing columns
         // for e.g. another join operator which does not carry one of the sides' key columns
         if (listBucketCols.isEmpty()) {
@@ -461,9 +226,9 @@ public class ConvertJoinMapJoin implemen
         }
         int colCount = 0;
         // parent op is guaranteed to have a single list because it is a reduce sink
-        for (String colName : parentColNames.get(0)) {
+        for (String colName : rs.getOpTraits().getBucketColNames().get(0)) {
           // all columns need to be at least a subset of the parentOfParent's bucket cols
-          ExprNodeDesc exprNodeDesc = colExprMap.get(colName);
+          ExprNodeDesc exprNodeDesc = rs.getColumnExprMap().get(colName);
           if (exprNodeDesc instanceof ExprNodeColumnDesc) {
             if (((ExprNodeColumnDesc)exprNodeDesc).getColumn().equals(listBucketCols.get(colCount))) {
               colCount++;
@@ -471,21 +236,32 @@ public class ConvertJoinMapJoin implemen
               break;
             }
           }
-
-          if (colCount == parentColNames.get(0).size()) {
+          
+          if (colCount == rs.getOpTraits().getBucketColNames().get(0).size()) {
+            // all keys matched.
+            int numBuckets = parentOfParent.getOpTraits().getNumBuckets();
+            boolean isSubQuery = false;
+            if (numBuckets < 0) {
+              isSubQuery = true;
+              numBuckets = rs.getConf().getNumReducers();
+            }
+            tezBucketJoinProcCtx.setNumBuckets(numBuckets);
+            tezBucketJoinProcCtx.setIsSubQuery(isSubQuery);
             return true;
           }
         }
       }
       return false;
     }
+
+    LOG.info("No info available to check for bucket map join. Cannot convert");
     return false;
   }
 
-  public int getMapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context,
+  public int mapJoinConversionPos(JoinOperator joinOp, OptimizeTezProcContext context, 
       int buckets) {
-    Set<Integer> bigTableCandidateSet =
-        MapJoinProcessor.getBigTableCandidates(joinOp.getConf().getConds());
+    Set<Integer> bigTableCandidateSet = MapJoinProcessor.
+      getBigTableCandidates(joinOp.getConf().getConds());
 
     long maxSize = context.conf.getLongVar(
         HiveConf.ConfVars.HIVECONVERTJOINNOCONDITIONALTASKTHRESHOLD);
@@ -511,7 +287,7 @@ public class ConvertJoinMapJoin implemen
       long inputSize = currInputStat.getDataSize();
       if ((bigInputStat == null) ||
           ((bigInputStat != null) &&
-          (inputSize > bigInputStat.getDataSize()))) {
+           (inputSize > bigInputStat.getDataSize()))) {
 
         if (bigTableFound) {
           // cannot convert to map join; we've already chosen a big table
@@ -571,9 +347,9 @@ public class ConvertJoinMapJoin implemen
    * for tez.
    */
 
-  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context,
+  public MapJoinOperator convertJoinMapJoin(JoinOperator joinOp, OptimizeTezProcContext context, 
       int bigTablePosition) throws SemanticException {
-    // bail on mux operator because currently the mux operator masks the emit keys
+    // bail on mux operator because currently the mux operator masks the emit keys 
     // of the constituent reduce sinks.
     for (Operator<? extends OperatorDesc> parentOp : joinOp.getParentOperators()) {
       if (parentOp instanceof MuxOperator) {
@@ -583,12 +359,12 @@ public class ConvertJoinMapJoin implemen
 
     //can safely convert the join to a map join.
     ParseContext parseContext = context.parseContext;
-    MapJoinOperator mapJoinOp =
-        MapJoinProcessor.convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(), joinOp,
-            parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
+    MapJoinOperator mapJoinOp = MapJoinProcessor.
+      convertJoinOpMapJoinOp(context.conf, parseContext.getOpParseCtx(),
+          joinOp, parseContext.getJoinContext().get(joinOp), bigTablePosition, true);
 
-    Operator<? extends OperatorDesc> parentBigTableOp =
-        mapJoinOp.getParentOperators().get(bigTablePosition);
+    Operator<? extends OperatorDesc> parentBigTableOp
+      = mapJoinOp.getParentOperators().get(bigTablePosition);
     if (parentBigTableOp instanceof ReduceSinkOperator) {
       for (Operator<?> p : parentBigTableOp.getParentOperators()) {
         // we might have generated a dynamic partition operator chain. Since
@@ -604,10 +380,11 @@ public class ConvertJoinMapJoin implemen
         }
       }
       mapJoinOp.getParentOperators().remove(bigTablePosition);
-      if (!(mapJoinOp.getParentOperators().contains(parentBigTableOp.getParentOperators().get(0)))) {
+      if (!(mapJoinOp.getParentOperators().contains(
+              parentBigTableOp.getParentOperators().get(0)))) {
         mapJoinOp.getParentOperators().add(bigTablePosition,
             parentBigTableOp.getParentOperators().get(0));
-      }
+              }
       parentBigTableOp.getParentOperators().get(0).removeChild(parentBigTableOp);
       for (Operator<? extends OperatorDesc> op : mapJoinOp.getParentOperators()) {
         if (!(op.getChildOperators().contains(mapJoinOp))) {
@@ -620,31 +397,15 @@ public class ConvertJoinMapJoin implemen
     return mapJoinOp;
   }
 
-  private boolean hasDynamicPartitionBroadcast(Operator<?> parent) {
-    boolean hasDynamicPartitionPruning = false;
-
-    for (Operator<?> op: parent.getChildOperators()) {
-      while (op != null) {
-        if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
-          // found dynamic partition pruning operator
-          hasDynamicPartitionPruning = true;
-          break;
-        }
-      
-        if (op instanceof ReduceSinkOperator || op instanceof FileSinkOperator) {
-          // crossing reduce sink or file sink means the pruning isn't for this parent.
-          break;
-        }
-
-        if (op.getChildOperators().size() != 1) {
-          // dynamic partition pruning pipeline doesn't have multiple children
-          break;
-        }
-
-        op = op.getChildOperators().get(0);
+  private boolean hasDynamicPartitionBroadcast(Operator<?> op) {
+    if (op instanceof AppMasterEventOperator && op.getConf() instanceof DynamicPruningEventDesc) {
+      return true;
+    }
+    for (Operator<?> c : op.getChildOperators()) {
+      if (hasDynamicPartitionBroadcast(c)) {
+        return true;
       }
     }
-
-    return hasDynamicPartitionPruning;
+    return false;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Oct  6 03:44:13 2014
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hive.ql.optimizer;
 
-import com.google.common.collect.Interner;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
@@ -40,6 +39,8 @@ import org.apache.hadoop.hive.ql.exec.No
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.OrcFileMergeOperator;
+import org.apache.hadoop.hive.ql.exec.RCFileMergeOperator;
 import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
 import org.apache.hadoop.hive.ql.exec.RowSchema;
 import org.apache.hadoop.hive.ql.exec.SMBMapJoinOperator;
@@ -100,6 +101,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
 import org.apache.hadoop.mapred.InputFormat;
 
 import java.io.Serializable;
@@ -578,6 +580,8 @@ public final class GenMapRedUtils {
     //This read entity is a direct read entity and not an indirect read (that is when
     // this is being read because it is a dependency of a view).
     boolean isDirectRead = (parentViewInfo == null);
+    PlanUtils.addInput(inputs,
+        new ReadEntity(parseCtx.getTopToTable().get(topOp), parentViewInfo, isDirectRead));
 
     for (Partition part : parts) {
       if (part.getTable().isPartitioned()) {
@@ -878,30 +882,6 @@ public final class GenMapRedUtils {
     }
   }
 
-  public static void internTableDesc(Task<?> task, Interner<TableDesc> interner) {
-
-    if (task instanceof ConditionalTask) {
-      for (Task tsk : ((ConditionalTask) task).getListTasks()) {
-        internTableDesc(tsk, interner);
-      }
-    } else if (task instanceof ExecDriver) {
-      MapredWork work = (MapredWork) task.getWork();
-      work.getMapWork().internTable(interner);
-    } else if (task != null && (task.getWork() instanceof TezWork)) {
-      TezWork work = (TezWork)task.getWork();
-      for (BaseWork w : work.getAllWorkUnsorted()) {
-        if (w instanceof MapWork) {
-          ((MapWork)w).internTable(interner);
-        }
-      }
-    }
-    if (task.getNumChild() > 0) {
-      for (Task childTask : task.getChildTasks()) {
-        internTableDesc(childTask, interner);
-      }
-    }
-  }
-
   /**
    * create a new plan and return.
    *
@@ -1527,7 +1507,7 @@ public final class GenMapRedUtils {
    *
    * @param fsInputDesc
    * @param finalName
-   * @param inputFormatClass
+   * @param inputFormatClass 
    * @return MergeWork if table is stored as RCFile or ORCFile,
    *         null otherwise
    */
@@ -1734,7 +1714,7 @@ public final class GenMapRedUtils {
           // There are separate configuration parameters to control whether to
           // merge for a map-only job
           // or for a map-reduce job
-          if (currTask.getWork() instanceof MapredWork) {
+          if (currTask.getWork() instanceof MapredWork) {  
             ReduceWork reduceWork = ((MapredWork) currTask.getWork()).getReduceWork();
             boolean mergeMapOnly =
               hconf.getBoolVar(ConfVars.HIVEMERGEMAPFILES) && reduceWork == null;
@@ -1833,7 +1813,7 @@ public final class GenMapRedUtils {
     return Collections.emptyList();
   }
 
-  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey)
+  public static List<Path> getInputPathsForPartialScan(QBParseInfo parseInfo, StringBuffer aggregationKey) 
     throws SemanticException {
     List<Path> inputPaths = new ArrayList<Path>();
     switch (parseInfo.getTableSpec().specType) {
@@ -1870,7 +1850,6 @@ public final class GenMapRedUtils {
   public static Set<Operator<?>> findTopOps(Operator<?> startOp, final Class<?> clazz) {
     final Set<Operator<?>> operators = new LinkedHashSet<Operator<?>>();
     OperatorUtils.iterateParents(startOp, new NodeUtils.Function<Operator<?>>() {
-      @Override
       public void apply(Operator<?> argument) {
         if (argument.getNumParent() == 0 && (clazz == null || clazz.isInstance(argument))) {
           operators.add(argument);

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/MapJoinProcessor.java Mon Oct  6 03:44:13 2014
@@ -389,8 +389,157 @@ public class MapJoinProcessor implements
       JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin)
       throws SemanticException {
 
-    MapJoinDesc mapJoinDescriptor =
-        getMapJoinDesc(hconf, opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin);
+    JoinDesc desc = op.getConf();
+    JoinCondDesc[] condns = desc.getConds();
+    Byte[] tagOrder = desc.getTagOrder();
+
+    // outer join cannot be performed on a table which is being cached
+    if (!noCheckOuterJoin) {
+      if (checkMapJoin(mapJoinPos, condns) < 0) {
+        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
+      }
+    }
+
+    // Walk over all the sources (which are guaranteed to be reduce sink
+    // operators).
+    // The join outputs a concatenation of all the inputs.
+    QBJoinTree leftSrc = joinTree.getJoinSrc();
+    List<ReduceSinkOperator> oldReduceSinkParentOps =
+        new ArrayList<ReduceSinkOperator>(op.getNumParent());
+    if (leftSrc != null) {
+      // assert mapJoinPos == 0;
+      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
+      assert parentOp.getParentOperators().size() == 1;
+      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+    }
+
+
+    byte pos = 0;
+    for (String src : joinTree.getBaseSrc()) {
+      if (src != null) {
+        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
+        assert parentOp.getParentOperators().size() == 1;
+        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
+      }
+      pos++;
+    }
+
+    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
+    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
+    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
+    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
+      byte tag = entry.getKey();
+      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
+
+      List<ExprNodeDesc> values = entry.getValue();
+      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
+      newValueExprs.put(tag, newValues);
+      for (int i = 0; i < schema.size(); i++) {
+        ColumnInfo column = schema.get(i);
+        if (column == null) {
+          continue;
+        }
+        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
+        int index = ExprNodeDescUtils.indexOf(expr, values);
+        if (index >= 0) {
+          colExprMap.put(column.getInternalName(), newValues.get(index));
+          schema.set(i, null);
+        }
+      }
+    }
+
+    // rewrite value index for mapjoin
+    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
+
+    // get the join keys from old parent ReduceSink operators
+    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
+
+    // construct valueTableDescs and valueFilteredTableDescs
+    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
+    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
+    int[][] filterMap = desc.getFilterMap();
+    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
+      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
+      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
+      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
+      if (pos != mapJoinPos) {
+        // remove values in key exprs for value table schema
+        // value expression for hashsink will be modified in LocalMapJoinProcessor
+        int[] valueIndex = new int[valueCols.size()];
+        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
+        for (int i = 0; i < valueIndex.length; i++) {
+          ExprNodeDesc expr = valueCols.get(i);
+          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
+          if (kindex >= 0) {
+            valueIndex[i] = kindex;
+          } else {
+            valueIndex[i] = -valueColsInValueExpr.size() - 1;
+            valueColsInValueExpr.add(expr);
+          }
+        }
+        if (needValueIndex(valueIndex)) {
+          valueIndices.put(pos, valueIndex);
+        }
+        valueCols = valueColsInValueExpr;
+      }
+      // deep copy expr node desc
+      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
+      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
+        ExprNodeColumnDesc isFilterDesc = new ExprNodeColumnDesc(TypeInfoFactory
+            .getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter", "filter", false);
+        valueFilteredCols.add(isFilterDesc);
+      }
+
+      TableDesc valueTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+          .getFieldSchemasFromColumnList(valueCols, "mapjoinvalue"));
+      TableDesc valueFilteredTableDesc = PlanUtils.getMapJoinValueTableDesc(PlanUtils
+          .getFieldSchemasFromColumnList(valueFilteredCols, "mapjoinvalue"));
+
+      valueTableDescs.add(valueTableDesc);
+      valueFilteredTableDescs.add(valueFilteredTableDesc);
+
+      keyExprMap.put(pos, keyCols);
+    }
+
+    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
+    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
+    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
+      byte srcTag = entry.getKey();
+      List<ExprNodeDesc> filter = entry.getValue();
+
+      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
+      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
+    }
+    desc.setFilters(filters = newFilters);
+
+    // create dumpfile prefix needed to create descriptor
+    String dumpFilePrefix = "";
+    if( joinTree.getMapAliases() != null ) {
+      for(String mapAlias : joinTree.getMapAliases()) {
+        dumpFilePrefix = dumpFilePrefix + mapAlias;
+      }
+      dumpFilePrefix = dumpFilePrefix+"-"+PlanUtils.getCountForMapJoinDumpFilePrefix();
+    } else {
+      dumpFilePrefix = "mapfile"+PlanUtils.getCountForMapJoinDumpFilePrefix();
+    }
+
+    List<ExprNodeDesc> keyCols = keyExprMap.get((byte)mapJoinPos);
+
+    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
+    TableDesc keyTableDesc = PlanUtils.getMapJoinKeyTableDesc(hconf,
+        PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
+    JoinCondDesc[] joinCondns = op.getConf().getConds();
+    MapJoinDesc mapJoinDescriptor = new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs,
+        valueTableDescs, valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns,
+        filters, op.getConf().getNoOuterJoin(), dumpFilePrefix);
+    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
+    mapJoinDescriptor.setTagOrder(tagOrder);
+    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
+    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
+    if (!valueIndices.isEmpty()) {
+      mapJoinDescriptor.setValueIndices(valueIndices);
+    }
 
     // reduce sink row resolver used to generate map join op
     RowResolver outputRS = opParseCtxMap.get(op).getRowResolver();
@@ -402,7 +551,6 @@ public class MapJoinProcessor implements
     opParseCtxMap.put(mapJoinOp, ctx);
 
     mapJoinOp.getConf().setReversedExprs(op.getConf().getReversedExprs());
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
     mapJoinOp.setColumnExprMap(colExprMap);
 
     List<Operator<? extends OperatorDesc>> childOps = op.getChildOperators();
@@ -1028,168 +1176,4 @@ public class MapJoinProcessor implements
     }
 
   }
-
-  public static MapJoinDesc getMapJoinDesc(HiveConf hconf,
-      LinkedHashMap<Operator<? extends OperatorDesc>, OpParseContext> opParseCtxMap,
-      JoinOperator op, QBJoinTree joinTree, int mapJoinPos, boolean noCheckOuterJoin) throws SemanticException {
-    JoinDesc desc = op.getConf();
-    JoinCondDesc[] condns = desc.getConds();
-    Byte[] tagOrder = desc.getTagOrder();
-
-    // outer join cannot be performed on a table which is being cached
-    if (!noCheckOuterJoin) {
-      if (checkMapJoin(mapJoinPos, condns) < 0) {
-        throw new SemanticException(ErrorMsg.NO_OUTER_MAPJOIN.getMsg());
-      }
-    }
-
-    // Walk over all the sources (which are guaranteed to be reduce sink
-    // operators).
-    // The join outputs a concatenation of all the inputs.
-    QBJoinTree leftSrc = joinTree.getJoinSrc();
-    List<ReduceSinkOperator> oldReduceSinkParentOps =
-        new ArrayList<ReduceSinkOperator>(op.getNumParent());
-    if (leftSrc != null) {
-      // assert mapJoinPos == 0;
-      Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(0);
-      assert parentOp.getParentOperators().size() == 1;
-      oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-    }
-
-    byte pos = 0;
-    for (String src : joinTree.getBaseSrc()) {
-      if (src != null) {
-        Operator<? extends OperatorDesc> parentOp = op.getParentOperators().get(pos);
-        assert parentOp.getParentOperators().size() == 1;
-        oldReduceSinkParentOps.add((ReduceSinkOperator) parentOp);
-      }
-      pos++;
-    }
-
-    Map<String, ExprNodeDesc> colExprMap = op.getColumnExprMap();
-    List<ColumnInfo> schema = new ArrayList<ColumnInfo>(op.getSchema().getSignature());
-    Map<Byte, List<ExprNodeDesc>> valueExprs = op.getConf().getExprs();
-    Map<Byte, List<ExprNodeDesc>> newValueExprs = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : valueExprs.entrySet()) {
-      byte tag = entry.getKey();
-      Operator<?> terminal = oldReduceSinkParentOps.get(tag);
-
-      List<ExprNodeDesc> values = entry.getValue();
-      List<ExprNodeDesc> newValues = ExprNodeDescUtils.backtrack(values, op, terminal);
-      newValueExprs.put(tag, newValues);
-      for (int i = 0; i < schema.size(); i++) {
-        ColumnInfo column = schema.get(i);
-        if (column == null) {
-          continue;
-        }
-        ExprNodeDesc expr = colExprMap.get(column.getInternalName());
-        int index = ExprNodeDescUtils.indexOf(expr, values);
-        if (index >= 0) {
-          colExprMap.put(column.getInternalName(), newValues.get(index));
-          schema.set(i, null);
-        }
-      }
-    }
-
-    // rewrite value index for mapjoin
-    Map<Byte, int[]> valueIndices = new HashMap<Byte, int[]>();
-
-    // get the join keys from old parent ReduceSink operators
-    Map<Byte, List<ExprNodeDesc>> keyExprMap = new HashMap<Byte, List<ExprNodeDesc>>();
-
-    // construct valueTableDescs and valueFilteredTableDescs
-    List<TableDesc> valueTableDescs = new ArrayList<TableDesc>();
-    List<TableDesc> valueFilteredTableDescs = new ArrayList<TableDesc>();
-    int[][] filterMap = desc.getFilterMap();
-    for (pos = 0; pos < op.getParentOperators().size(); pos++) {
-      ReduceSinkOperator inputRS = oldReduceSinkParentOps.get(pos);
-      List<ExprNodeDesc> keyCols = inputRS.getConf().getKeyCols();
-      List<ExprNodeDesc> valueCols = newValueExprs.get(pos);
-      if (pos != mapJoinPos) {
-        // remove values in key exprs for value table schema
-        // value expression for hashsink will be modified in
-        // LocalMapJoinProcessor
-        int[] valueIndex = new int[valueCols.size()];
-        List<ExprNodeDesc> valueColsInValueExpr = new ArrayList<ExprNodeDesc>();
-        for (int i = 0; i < valueIndex.length; i++) {
-          ExprNodeDesc expr = valueCols.get(i);
-          int kindex = ExprNodeDescUtils.indexOf(expr, keyCols);
-          if (kindex >= 0) {
-            valueIndex[i] = kindex;
-          } else {
-            valueIndex[i] = -valueColsInValueExpr.size() - 1;
-            valueColsInValueExpr.add(expr);
-          }
-        }
-        if (needValueIndex(valueIndex)) {
-          valueIndices.put(pos, valueIndex);
-        }
-        valueCols = valueColsInValueExpr;
-      }
-      // deep copy expr node desc
-      List<ExprNodeDesc> valueFilteredCols = ExprNodeDescUtils.clone(valueCols);
-      if (filterMap != null && filterMap[pos] != null && pos != mapJoinPos) {
-        ExprNodeColumnDesc isFilterDesc =
-            new ExprNodeColumnDesc(
-                TypeInfoFactory.getPrimitiveTypeInfo(serdeConstants.SMALLINT_TYPE_NAME), "filter",
-                "filter", false);
-        valueFilteredCols.add(isFilterDesc);
-      }
-
-      TableDesc valueTableDesc =
-          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(valueCols,
-              "mapjoinvalue"));
-      TableDesc valueFilteredTableDesc =
-          PlanUtils.getMapJoinValueTableDesc(PlanUtils.getFieldSchemasFromColumnList(
-              valueFilteredCols, "mapjoinvalue"));
-
-      valueTableDescs.add(valueTableDesc);
-      valueFilteredTableDescs.add(valueFilteredTableDesc);
-
-      keyExprMap.put(pos, keyCols);
-    }
-
-    Map<Byte, List<ExprNodeDesc>> filters = desc.getFilters();
-    Map<Byte, List<ExprNodeDesc>> newFilters = new HashMap<Byte, List<ExprNodeDesc>>();
-    for (Map.Entry<Byte, List<ExprNodeDesc>> entry : filters.entrySet()) {
-      byte srcTag = entry.getKey();
-      List<ExprNodeDesc> filter = entry.getValue();
-
-      Operator<?> terminal = oldReduceSinkParentOps.get(srcTag);
-      newFilters.put(srcTag, ExprNodeDescUtils.backtrack(filter, op, terminal));
-    }
-    desc.setFilters(filters = newFilters);
-
-    // create dumpfile prefix needed to create descriptor
-    String dumpFilePrefix = "";
-    if (joinTree.getMapAliases() != null) {
-      for (String mapAlias : joinTree.getMapAliases()) {
-        dumpFilePrefix = dumpFilePrefix + mapAlias;
-      }
-      dumpFilePrefix = dumpFilePrefix + "-" + PlanUtils.getCountForMapJoinDumpFilePrefix();
-    } else {
-      dumpFilePrefix = "mapfile" + PlanUtils.getCountForMapJoinDumpFilePrefix();
-    }
-
-    List<ExprNodeDesc> keyCols = keyExprMap.get((byte) mapJoinPos);
-
-    List<String> outputColumnNames = op.getConf().getOutputColumnNames();
-    TableDesc keyTableDesc =
-        PlanUtils.getMapJoinKeyTableDesc(hconf,
-            PlanUtils.getFieldSchemasFromColumnList(keyCols, MAPJOINKEY_FIELDPREFIX));
-    JoinCondDesc[] joinCondns = op.getConf().getConds();
-    MapJoinDesc mapJoinDescriptor =
-        new MapJoinDesc(keyExprMap, keyTableDesc, newValueExprs, valueTableDescs,
-            valueFilteredTableDescs, outputColumnNames, mapJoinPos, joinCondns, filters, op
-                .getConf().getNoOuterJoin(), dumpFilePrefix);
-    mapJoinDescriptor.setStatistics(op.getConf().getStatistics());
-    mapJoinDescriptor.setTagOrder(tagOrder);
-    mapJoinDescriptor.setNullSafes(desc.getNullSafes());
-    mapJoinDescriptor.setFilterMap(desc.getFilterMap());
-    if (!valueIndices.isEmpty()) {
-      mapJoinDescriptor.setValueIndices(valueIndices);
-    }
-
-    return mapJoinDescriptor;
-  }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Mon Oct  6 03:44:13 2014
@@ -51,12 +51,7 @@ public class Optimizer {
    * @param hiveConf
    */
   public void initialize(HiveConf hiveConf) {
-
-    boolean isTezExecEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez");
-    boolean bucketMapJoinOptimizer = false;
-
     transformations = new ArrayList<Transform>();
-
     // Add the transformation that computes the lineage information.
     transformations.add(new Generator());
     if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTPPD)) {
@@ -86,16 +81,15 @@ public class Optimizer {
     }
     transformations.add(new SamplePruner());
     transformations.add(new MapJoinProcessor());
-
-    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) && !isTezExecEngine) {
+    boolean bucketMapJoinOptimizer = false;
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN)) {
       transformations.add(new BucketMapJoinOptimizer());
       bucketMapJoinOptimizer = true;
     }
 
     // If optimize hive.optimize.bucketmapjoin.sortedmerge is set, add both
     // BucketMapJoinOptimizer and SortedMergeBucketMapJoinOptimizer
-    if ((HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN))
-        && !isTezExecEngine) {
+    if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN)) {
       if (!bucketMapJoinOptimizer) {
         // No need to add BucketMapJoinOptimizer twice
         transformations.add(new BucketMapJoinOptimizer());
@@ -125,7 +119,7 @@ public class Optimizer {
     if(HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTCORRELATION) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW) &&
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME) &&
-        !isTezExecEngine) {
+        !HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
       transformations.add(new CorrelationOptimizer());
     }
     if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
@@ -135,7 +129,7 @@ public class Optimizer {
       transformations.add(new StatsOptimizer());
     }
     String execEngine = HiveConf.getVar(hiveConf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
-    if (pctx.getContext().getExplain() && !"spark".equals(execEngine) && !"tez".equals(execEngine)) {
+    if ((pctx.getContext().getExplain() || "spark".equals(execEngine)) && !"tez".equals(execEngine)) {
       transformations.add(new AnnotateWithStatistics());
       transformations.add(new AnnotateWithOpTraits());
     }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ReduceSinkMapJoinProc.java Mon Oct  6 03:44:13 2014
@@ -52,7 +52,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty;
 import org.apache.hadoop.hive.ql.plan.TezEdgeProperty.EdgeType;
 import org.apache.hadoop.hive.ql.plan.TezWork;
-import org.apache.hadoop.hive.ql.plan.TezWork.VertexType;
 import org.apache.hadoop.hive.ql.stats.StatsUtils;
 
 public class ReduceSinkMapJoinProc implements NodeProcessor {
@@ -184,10 +183,7 @@ public class ReduceSinkMapJoinProc imple
         TezWork tezWork = context.currentTask.getWork();
         LOG.debug("connecting "+parentWork.getName()+" with "+myWork.getName());
         tezWork.connect(parentWork, myWork, edgeProp);
-        if (edgeType == EdgeType.CUSTOM_EDGE) {
-          tezWork.setVertexType(myWork, VertexType.INITIALIZED_EDGES);
-        }
-
+        
         ReduceSinkOperator r = null;
         if (parentRS.getConf().getOutputName() != null) {
           LOG.debug("Cloning reduce sink for multi-child broadcast edge");

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java?rev=1629562&r1=1629561&r2=1629562&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java Mon Oct  6 03:44:13 2014
@@ -44,9 +44,9 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
@@ -55,25 +55,13 @@ import org.apache.hadoop.hive.ql.parse.P
 import org.apache.hadoop.hive.ql.parse.QB;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.parse.SplitSample;
-import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
-import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
 import org.apache.hadoop.hive.ql.plan.ListSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
-import org.apache.hadoop.hive.ql.plan.SelectDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToBinary;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToChar;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDate;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToDecimal;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUnixTimeStamp;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToUtcTimestamp;
-import org.apache.hadoop.hive.ql.udf.generic.GenericUDFToVarchar;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobConf;
 
@@ -85,11 +73,9 @@ public class SimpleFetchOptimizer implem
 
   private final Log LOG = LogFactory.getLog(SimpleFetchOptimizer.class.getName());
 
-  @Override
   public ParseContext transform(ParseContext pctx) throws SemanticException {
     Map<String, Operator<? extends OperatorDesc>> topOps = pctx.getTopOps();
-    if (pctx.getQB().getIsQuery() && !pctx.getQB().getParseInfo().isAnalyzeCommand()
-        && topOps.size() == 1) {
+    if (pctx.getQB().isSimpleSelectQuery() && topOps.size() == 1) {
       // no join, no groupby, no distinct, no lateral view, no subq,
       // no CTAS or insert, not analyze command, and single sourced.
       String alias = (String) pctx.getTopOps().keySet().toArray()[0];
@@ -158,7 +144,7 @@ public class SimpleFetchOptimizer implem
   // for non-aggressive mode (minimal)
   // 1. samping is not allowed
   // 2. for partitioned table, all filters should be targeted to partition column
-  // 3. SelectOperator should use only simple cast/column access
+  // 3. SelectOperator should be select star
   private FetchData checkTree(boolean aggressive, ParseContext pctx, String alias,
       TableScanOperator ts) throws HiveException {
     SplitSample splitSample = pctx.getNameToSplitSample().get(alias);
@@ -170,7 +156,7 @@ public class SimpleFetchOptimizer implem
       return null;
     }
 
-    Table table = pctx.getTopToTable().get(ts);
+    Table table = qb.getMetaData().getAliasToTable().get(alias);
     if (table == null) {
       return null;
     }
@@ -195,71 +181,34 @@ public class SimpleFetchOptimizer implem
     return null;
   }
 
-  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggressive,
+  private FetchData checkOperators(FetchData fetch, TableScanOperator ts, boolean aggresive,
       boolean bypassFilter) {
     if (ts.getChildOperators().size() != 1) {
       return null;
     }
     Operator<?> op = ts.getChildOperators().get(0);
     for (; ; op = op.getChildOperators().get(0)) {
-      if (op instanceof SelectOperator) {
-        if (!aggressive) {
-          if (!checkExpressions((SelectOperator) op)) {
-            break;
-          }
-        }
-        continue;
-      }
-
-      if (aggressive) {
-        if (!(op instanceof LimitOperator || op instanceof FilterOperator)) {
+      if (aggresive) {
+        if (!(op instanceof LimitOperator || op instanceof FilterOperator
+            || op instanceof SelectOperator)) {
           break;
         }
-      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter))) {
+      } else if (!(op instanceof LimitOperator || (op instanceof FilterOperator && bypassFilter)
+          || (op instanceof SelectOperator && ((SelectOperator) op).getConf().isSelectStar()))) {
         break;
       }
-
       if (op.getChildOperators() == null || op.getChildOperators().size() != 1) {
         return null;
       }
     }
-
     if (op instanceof FileSinkOperator) {
       fetch.scanOp = ts;
       fetch.fileSink = op;
       return fetch;
     }
-
     return null;
   }
 
-  private boolean checkExpressions(SelectOperator op) {
-    SelectDesc desc = op.getConf();
-    for (ExprNodeDesc expr : desc.getColList()) {
-      if (!checkExpression(expr)) {
-        return false;
-      }
-    }
-    return true;
-  }
-
-  private boolean checkExpression(ExprNodeDesc expr) {
-    if (expr instanceof ExprNodeConstantDesc || expr instanceof ExprNodeColumnDesc) {
-      return true;
-    }
-
-    if (expr instanceof ExprNodeGenericFuncDesc) {
-      GenericUDF udf = ((ExprNodeGenericFuncDesc) expr).getGenericUDF();
-      if (udf instanceof GenericUDFToBinary || udf instanceof GenericUDFToChar
-          || udf instanceof GenericUDFToDate || udf instanceof GenericUDFToDecimal
-          || udf instanceof GenericUDFToUnixTimeStamp || udf instanceof GenericUDFToUtcTimestamp
-          || udf instanceof GenericUDFToVarchar) {
-        return expr.getChildren().size() == 1 && checkExpression(expr.getChildren().get(0));
-      }
-    }
-    return false;
-  }
-
   private class FetchData {
 
     private final ReadEntity parent;
@@ -291,7 +240,7 @@ public class SimpleFetchOptimizer implem
       this.splitSample = splitSample;
       this.onlyPruningFilter = bypassFilter;
     }
-
+    
     /*
      * all filters were executed during partition pruning
      */
@@ -302,7 +251,7 @@ public class SimpleFetchOptimizer implem
     private FetchWork convertToWork() throws HiveException {
       inputs.clear();
       if (!table.isPartitioned()) {
-        inputs.add(new ReadEntity(table, parent, parent == null));
+        inputs.add(new ReadEntity(table, parent));
         FetchWork work = new FetchWork(table.getPath(), Utilities.getTableDesc(table));
         PlanUtils.configureInputJobPropertiesForStorageHandler(work.getTblDesc());
         work.setSplitSample(splitSample);
@@ -312,12 +261,12 @@ public class SimpleFetchOptimizer implem
       List<PartitionDesc> partP = new ArrayList<PartitionDesc>();
 
       for (Partition partition : partsList.getNotDeniedPartns()) {
-        inputs.add(new ReadEntity(partition, parent, parent == null));
+        inputs.add(new ReadEntity(partition, parent));
         listP.add(partition.getDataLocation());
         partP.add(Utilities.getPartitionDesc(partition));
       }
       Table sourceTable = partsList.getSourceTable();
-      inputs.add(new ReadEntity(sourceTable, parent, parent == null));
+      inputs.add(new ReadEntity(sourceTable, parent));
       TableDesc table = Utilities.getTableDesc(sourceTable);
       FetchWork work = new FetchWork(listP, partP, table);
       if (!work.getPartDesc().isEmpty()) {