You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by am...@apache.org on 2013/04/26 06:59:58 UTC

svn commit: r1476039 [8/22] - in /hive/branches/HIVE-4115: ./ beeline/ beeline/src/java/org/apache/hive/beeline/ bin/ builtins/ cli/ common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ eclipse-templates/ hbase-handler/ hbase-handler/src/java...

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Fri Apr 26 04:59:50 2013
@@ -40,10 +40,10 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 import java.util.SortedSet;
 import java.util.TreeSet;
+import java.util.Map.Entry;
 
 import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.commons.lang.StringUtils;
@@ -86,6 +86,8 @@ import org.apache.hadoop.hive.ql.hooks.R
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.BlockMergeTask;
 import org.apache.hadoop.hive.ql.io.rcfile.merge.MergeWork;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateTask;
+import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateWork;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockMode;
@@ -111,8 +113,8 @@ import org.apache.hadoop.hive.ql.plan.Al
 import org.apache.hadoop.hive.ql.plan.AlterIndexDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableAlterPartDesc;
 import org.apache.hadoop.hive.ql.plan.AlterTableDesc;
-import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
 import org.apache.hadoop.hive.ql.plan.AlterTableSimpleDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableExchangePartition;
 import org.apache.hadoop.hive.ql.plan.CreateDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.CreateIndexDesc;
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc;
@@ -150,6 +152,7 @@ import org.apache.hadoop.hive.ql.plan.Sh
 import org.apache.hadoop.hive.ql.plan.SwitchDatabaseDesc;
 import org.apache.hadoop.hive.ql.plan.TruncateTableDesc;
 import org.apache.hadoop.hive.ql.plan.UnlockTableDesc;
+import org.apache.hadoop.hive.ql.plan.AlterTableDesc.AlterTableTypes;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.hive.ql.security.authorization.Privilege;
 import org.apache.hadoop.hive.serde.serdeConstants;
@@ -429,6 +432,12 @@ public class DDLTask extends Task<DDLWor
         return truncateTable(db, truncateTableDesc);
       }
 
+      AlterTableExchangePartition alterTableExchangePartition =
+        work.getAlterTableExchangePartition();
+      if (alterTableExchangePartition != null) {
+        return exchangeTablePartition(db, alterTableExchangePartition);
+      }
+
     } catch (InvalidTableException e) {
       formatter.consoleError(console, "Table " + e.getTableName() + " does not exist",
                              formatter.MISSING);
@@ -3951,6 +3960,21 @@ public class DDLTask extends Task<DDLWor
   }
 
   private int truncateTable(Hive db, TruncateTableDesc truncateTableDesc) throws HiveException {
+
+    if (truncateTableDesc.getColumnIndexes() != null) {
+      ColumnTruncateWork truncateWork = new ColumnTruncateWork(
+          truncateTableDesc.getColumnIndexes(), truncateTableDesc.getInputDir(),
+          truncateTableDesc.getOutputDir());
+      truncateWork.setListBucketingCtx(truncateTableDesc.getLbCtx());
+      truncateWork.setMapperCannotSpanPartns(true);
+      DriverContext driverCxt = new DriverContext();
+      ColumnTruncateTask taskExec = new ColumnTruncateTask();
+      taskExec.initialize(db.getConf(), null, driverCxt);
+      taskExec.setWork(truncateWork);
+      taskExec.setQueryPlan(this.getQueryPlan());
+      return taskExec.execute(driverCxt);
+    }
+
     String tableName = truncateTableDesc.getTableName();
     Map<String, String> partSpec = truncateTableDesc.getPartSpec();
 
@@ -3969,6 +3993,17 @@ public class DDLTask extends Task<DDLWor
     return 0;
   }
 
+  private int exchangeTablePartition(Hive db,
+      AlterTableExchangePartition exchangePartition) throws HiveException {
+    Map<String, String> partitionSpecs = exchangePartition.getPartitionSpecs();
+    Table destTable = exchangePartition.getDestinationTable();
+    Table sourceTable = exchangePartition.getSourceTable();
+    db.exchangeTablePartitions(partitionSpecs, sourceTable.getDbName(),
+        sourceTable.getTableName(),destTable.getDbName(),
+        destTable.getTableName());
+    return 0;
+  }
+
   private List<Path> getLocations(Hive db, Table table, Map<String, String> partSpec)
       throws HiveException {
     List<Path> locations = new ArrayList<Path>();

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Fri Apr 26 04:59:50 2013
@@ -412,7 +412,7 @@ public class ExecDriver extends Task<Map
           LOG.info("Add 1 archive file to distributed cache. Archive file: " + hdfsFilePath.toUri());
         }
       }
-
+      work.configureJobConf(job);
       addInputPaths(job, work, emptyScratchDirStr, ctx);
 
       Utilities.setMapRedWork(job, work, ctx.getMRTmpFileURI());

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecMapper.java Fri Apr 26 04:59:50 2013
@@ -50,7 +50,6 @@ public class ExecMapper extends MapReduc
   private JobConf jc;
   private boolean abort = false;
   private Reporter rp;
-  private List<OperatorHook> opHooks;
   public static final Log l4j = LogFactory.getLog("ExecMapper");
   private static boolean done;
 
@@ -99,7 +98,6 @@ public class ExecMapper extends MapReduc
       mo.setExecContext(execContext);
       mo.initializeLocalWork(jc);
       mo.initialize(jc, null);
-      opHooks = OperatorHookUtils.getOperatorHooks(jc);
 
       if (localWork == null) {
         return;
@@ -132,7 +130,6 @@ public class ExecMapper extends MapReduc
       rp = reporter;
       mo.setOutputCollector(oc);
       mo.setReporter(rp);
-      mo.setOperatorHooks(opHooks);
       MapredContext.get().setReporter(reporter);
     }
     // reset the execContext for each new row

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecReducer.java Fri Apr 26 04:59:50 2013
@@ -66,7 +66,6 @@ public class ExecReducer extends MapRedu
   private long nextCntr = 1;
 
   private static String[] fieldNames;
-  private List<OperatorHook> opHooks;
   public static final Log l4j = LogFactory.getLog("ExecReducer");
   private boolean isLogInfoEnabled = false;
 
@@ -152,7 +151,6 @@ public class ExecReducer extends MapRedu
     try {
       l4j.info(reducer.dump(0));
       reducer.initialize(jc, rowObjectInspector);
-      opHooks = OperatorHookUtils.getOperatorHooks(jc);
     } catch (Throwable e) {
       abort = true;
       if (e instanceof OutOfMemoryError) {
@@ -183,7 +181,6 @@ public class ExecReducer extends MapRedu
       rp = reporter;
       reducer.setOutputCollector(oc);
       reducer.setReporter(rp);
-      reducer.setOperatorHooks(opHooks);
       MapredContext.get().setReporter(reporter);
     }
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -755,7 +755,9 @@ public class FileSinkOperator extends Te
         // check # of dp
         if (valToPaths.size() > maxPartitions) {
           // throw fatal error
-          incrCounter(fatalErrorCntr, 1);
+          if (counterNameToEnum != null) {
+            incrCounter(fatalErrorCntr, 1);
+          }
           fatalError = true;
           LOG.error("Fatal error was thrown due to exceeding number of dynamic partitions");
         }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Apr 26 04:59:50 2013
@@ -58,12 +58,12 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObject;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
@@ -90,8 +90,6 @@ public class GroupByOperator extends Ope
   protected transient ObjectInspector[][] aggregationParameterObjectInspectors;
   protected transient ObjectInspector[][] aggregationParameterStandardObjectInspectors;
   protected transient Object[][] aggregationParameterObjects;
-  // In the future, we may allow both count(DISTINCT a) and sum(DISTINCT a) in
-  // the same SQL clause,
   // so aggregationIsDistinct is a boolean array instead of a single number.
   protected transient boolean[] aggregationIsDistinct;
   // Map from integer tag to distinct aggrs
@@ -887,8 +885,15 @@ public class GroupByOperator extends Ope
 
     // Forward the current keys if needed for sort-based aggregation
     if (currentKeys != null && !keysAreEqual) {
-      forward(currentKeys.getKeyArray(), aggregations);
-      countAfterReport = 0;
+      // This is to optimize queries of the form:
+      // select count(distinct key) from T
+      // where T is sorted and bucketized by key
+      // Partial aggregation is performed on the mapper, and the
+      // reducer gets 1 row (partial result) per mapper.
+      if (!conf.isDontResetAggrsDistinct()) {
+        forward(currentKeys.getKeyArray(), aggregations);
+        countAfterReport = 0;
+      }
     }
 
     // Need to update the keys?
@@ -900,7 +905,10 @@ public class GroupByOperator extends Ope
       }
 
       // Reset the aggregations
-      resetAggregations(aggregations);
+      // For distincts optimization with sorting/bucketing, perform partial aggregation
+      if (!conf.isDontResetAggrsDistinct()) {
+        resetAggregations(aggregations);
+      }
 
       // clear parameters in last-invoke
       for (int i = 0; i < aggregationsParametersLastInvoke.length; i++) {
@@ -1076,7 +1084,7 @@ public class GroupByOperator extends Ope
       try {
         // put the hash related stats in statsMap if applicable, so that they
         // are sent to jt as counters
-        if (hashAggr) {
+        if (hashAggr && counterNameToEnum != null) {
           incrCounter(counterNameHashOut, numRowsHashTbl);
         }
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -54,6 +54,7 @@ public class HashTableSinkOperator exten
   private static final long serialVersionUID = 1L;
   private static final Log LOG = LogFactory.getLog(HashTableSinkOperator.class.getName());
 
+  protected static MapJoinMetaData metadata = new MapJoinMetaData();
   // from abstract map join operator
   /**
    * The expressions for join inputs's join keys.
@@ -164,6 +165,10 @@ public class HashTableSinkOperator exten
 
   }
 
+  public static MapJoinMetaData getMetadata() {
+    return metadata;
+  }
+
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join exceeds available memory. "
@@ -301,8 +306,7 @@ public class HashTableSinkOperator exten
         null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
 
-    MapJoinMetaData.clear();
-    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
         ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
             ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
   }
@@ -349,7 +353,8 @@ public class HashTableSinkOperator exten
 
         // Construct externalizable objects for key and value
         if (needNewKey) {
-          MapJoinObjectValue valueObj = new MapJoinObjectValue(metadataValueTag[tag], res);
+          MapJoinObjectValue valueObj = new MapJoinObjectValue(
+              metadataValueTag[tag], res);
 
           rowNumber++;
           if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
@@ -391,7 +396,7 @@ public class HashTableSinkOperator exten
         .getStandardStructObjectInspector(newNames, newFields);
 
     int alias = Integer.valueOf(metadataValueTag[tag]);
-    MapJoinMetaData.put(alias, new HashTableSinkObjectCtx(
+    metadata.put(Integer.valueOf(metadataValueTag[tag]), new HashTableSinkObjectCtx(
         standardOI, valueSerDe, valueTableDesc, hasFilter(alias), hconf));
   }
 
@@ -435,7 +440,7 @@ public class HashTableSinkOperator exten
 
       super.closeOp(abort);
     } catch (Exception e) {
-      LOG.error("Generate Hashtable error");
+      LOG.error("Generate Hashtable error", e);
       e.printStackTrace();
     }
   }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Fri Apr 26 04:59:50 2013
@@ -269,4 +269,14 @@ public class JoinOperator extends Common
     // optimizations for now.
     return false;
   }
+
+  @Override
+  public boolean opAllowedBeforeSortMergeJoin() {
+    // If a join occurs before the sort-merge join, it is not useful to convert the the sort-merge
+    // join to a mapjoin. It might be simpler to perform the join and then a sort-merge join
+    // join. By converting the sort-merge join to a map-join, the job will be executed in 2
+    // mapjoins in the best case. The number of inputs for the join is more than 1 so it would
+    // be difficult to figure out the big table for the mapjoin.
+    return false;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinMetaData.java Fri Apr 26 04:59:50 2013
@@ -24,20 +24,21 @@ import java.util.Map;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
 
 public class MapJoinMetaData {
-  static transient Map<Integer, HashTableSinkObjectCtx> mapMetadata = new HashMap<Integer, HashTableSinkObjectCtx>();
+  transient Map<Integer, HashTableSinkObjectCtx> mapMetadata =
+      new HashMap<Integer, HashTableSinkObjectCtx>();
   static ArrayList<Object> list = new ArrayList<Object>();
 
   public MapJoinMetaData(){
 
   }
-  public static void put(Integer key, HashTableSinkObjectCtx value){
+  public void put(Integer key, HashTableSinkObjectCtx value){
     mapMetadata.put(key, value);
   }
-  public static HashTableSinkObjectCtx get(Integer key){
+  public HashTableSinkObjectCtx get(Integer key){
     return mapMetadata.get(key);
   }
 
-  public static void clear(){
+  public void clear(){
     mapMetadata.clear();
   }
 
@@ -45,5 +46,4 @@ public class MapJoinMetaData {
     list.clear();
     return list;
   }
-
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java Fri Apr 26 04:59:50 2013
@@ -54,6 +54,11 @@ public class MapJoinOperator extends Abs
 
   protected transient HashMapWrapper<AbstractMapJoinKey, MapJoinObjectValue>[] mapJoinTables;
 
+  protected static MapJoinMetaData metadata = new MapJoinMetaData();
+  public static MapJoinMetaData getMetadata() {
+    return metadata;
+  }
+
   private static final transient String[] FATAL_ERR_MSG = {
       null, // counter value 0 means no error
       "Mapside join exceeds available memory. "
@@ -117,7 +122,7 @@ public class MapJoinOperator extends Abs
     SerDe keySerializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc.getDeserializerClass(),
         null);
     keySerializer.initialize(null, keyTableDesc.getProperties());
-    MapJoinMetaData.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
+    metadata.put(Integer.valueOf(metadataKeyTag), new HashTableSinkObjectCtx(
         ObjectInspectorUtils.getStandardObjectInspector(keySerializer.getObjectInspector(),
             ObjectInspectorCopyOption.WRITABLE), keySerializer, keyTableDesc, false, hconf));
 
@@ -136,7 +141,7 @@ public class MapJoinOperator extends Abs
       valueSerDe.initialize(null, valueTableDesc.getProperties());
 
       ObjectInspector inspector = valueSerDe.getObjectInspector();
-      MapJoinMetaData.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
+      metadata.put(Integer.valueOf(pos), new HashTableSinkObjectCtx(ObjectInspectorUtils
           .getStandardObjectInspector(inspector, ObjectInspectorCopyOption.WRITABLE),
           valueSerDe, valueTableDesc, hasFilter(pos), hconf));
     }
@@ -189,8 +194,8 @@ public class MapJoinOperator extends Abs
         hashtable.initilizePersistentHash(path.toUri().getPath());
       }
     } catch (Exception e) {
-      LOG.error("Load Distributed Cache Error");
-      throw new HiveException(e.getMessage());
+      LOG.error("Load Distributed Cache Error", e);
+      throw new HiveException(e);
     }
   }
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Fri Apr 26 04:59:50 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.ql.Context;
@@ -46,7 +47,6 @@ import org.apache.hadoop.hive.ql.io.rcfi
 import org.apache.hadoop.hive.ql.lockmgr.HiveLock;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockManager;
 import org.apache.hadoop.hive.ql.lockmgr.HiveLockObj;
-import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
@@ -394,7 +394,8 @@ public class MoveTask extends Task<MoveW
             }
             dc = null; // reset data container to prevent it being added again.
           } else { // static partitions
-            List<String> partVals = Hive.getPvals(table.getPartCols(), tbd.getPartitionSpec());
+            List<String> partVals = MetaStoreUtils.getPvals(table.getPartCols(),
+                tbd.getPartitionSpec());
             db.validatePartitionNameCharacters(partVals);
             db.loadPartition(new Path(tbd.getSourceDir()), tbd.getTable().getTableName(),
                 tbd.getPartitionSpec(), tbd.getReplace(), tbd.getHoldDDLTime(),

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Apr 26 04:59:50 2013
@@ -55,7 +55,6 @@ public abstract class Operator<T extends
   // Bean methods
 
   private static final long serialVersionUID = 1L;
-  List<OperatorHook> operatorHooks;
 
   private Configuration configuration;
   protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -241,17 +240,6 @@ public abstract class Operator<T extends
     return id;
   }
 
-  public void setOperatorHooks(List<OperatorHook> opHooks){
-    operatorHooks = opHooks;
-    if (childOperators == null) {
-      return;
-    }
-
-    for (Operator<? extends OperatorDesc> op : childOperators) {
-      op.setOperatorHooks(opHooks);
-    }
-  }
-
   public void setReporter(Reporter rep) {
     reporter = rep;
 
@@ -436,34 +424,6 @@ public abstract class Operator<T extends
     }
   }
 
-  private void enterOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
-    if (this.operatorHooks == null) {
-      return;
-    }
-    for(OperatorHook opHook : this.operatorHooks) {
-      opHook.enter(opHookContext);
-    }
-  }
-
-  private void exitOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
-    if (this.operatorHooks == null) {
-      return;
-    }
-    for(OperatorHook opHook : this.operatorHooks) {
-      opHook.exit(opHookContext);
-    }
-  }
-
-  private void closeOperatorHooks(OperatorHookContext opHookContext) throws HiveException {
-    if (this.operatorHooks == null) {
-      return;
-    }
-    for(OperatorHook opHook : this.operatorHooks) {
-      opHook.close(opHookContext);
-    }
-  }
-
-
   /**
    * Collects all the parent's output object inspectors and calls actual
    * initialization method.
@@ -525,12 +485,22 @@ public abstract class Operator<T extends
     if (fatalError) {
       return;
     }
-    OperatorHookContext opHookContext = new OperatorHookContext(this, row, tag);
-    preProcessCounter();
-    enterOperatorHooks(opHookContext);
-    processOp(row, tag);
-    exitOperatorHooks(opHookContext);
-    postProcessCounter();
+
+    if (counterNameToEnum != null) {
+      inputRows++;
+      if ((inputRows % 1000) == 0) {
+        incrCounter(numInputRowsCntr, inputRows);
+        incrCounter(timeTakenCntr, totalTime);
+        inputRows = 0;
+        totalTime = 0;
+      }
+
+      beginTime = System.currentTimeMillis();
+      processOp(row, tag);
+      totalTime += (System.currentTimeMillis() - beginTime);
+    } else {
+      processOp(row, tag);
+    }
   }
 
   // If a operator wants to do some work at the beginning of a group
@@ -606,13 +576,14 @@ public abstract class Operator<T extends
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
 
-    incrCounter(numInputRowsCntr, inputRows);
-    incrCounter(numOutputRowsCntr, outputRows);
-    incrCounter(timeTakenCntr, totalTime);
+    if (counterNameToEnum != null) {
+      incrCounter(numInputRowsCntr, inputRows);
+      incrCounter(numOutputRowsCntr, outputRows);
+      incrCounter(timeTakenCntr, totalTime);
+    }
 
     LOG.info(id + " forwarded " + cntr + " rows");
 
-    closeOperatorHooks(new OperatorHookContext(this));
     // call the operator specific close routine
     closeOp(abort);
 
@@ -822,9 +793,11 @@ public abstract class Operator<T extends
   protected void forward(Object row, ObjectInspector rowInspector)
       throws HiveException {
 
-    if ((++outputRows % 1000) == 0) {
-      incrCounter(numOutputRowsCntr, outputRows);
-      outputRows = 0;
+    if (counterNameToEnum != null) {
+      if ((++outputRows % 1000) == 0) {
+        incrCounter(numOutputRowsCntr, outputRows);
+        outputRows = 0;
+      }
     }
 
     if (isLogInfoEnabled) {
@@ -1158,39 +1131,12 @@ public abstract class Operator<T extends
   protected transient Object groupKeyObject;
 
   /**
-   * this is called before operator process to buffer some counters.
-   */
-  private void preProcessCounter() {
-    inputRows++;
-    if ((inputRows % 1000) == 0) {
-      incrCounter(numInputRowsCntr, inputRows);
-      incrCounter(timeTakenCntr, totalTime);
-      inputRows = 0;
-      totalTime = 0;
-    }
-    beginTime = System.currentTimeMillis();
-  }
-
-  /**
-   * this is called after operator process to buffer some counters.
-   */
-  private void postProcessCounter() {
-    if (counterNameToEnum != null) {
-      totalTime += (System.currentTimeMillis() - beginTime);
-    }
-  }
-
-  /**
    * this is called in operators in map or reduce tasks.
    *
    * @param name
    * @param amount
    */
   protected void incrCounter(String name, long amount) {
-    if(counterNameToEnum == null) {
-      return;
-    }
-
     String counterName = getWrappedCounterName(name);
     ProgressCounter pc = counterNameToEnum.get(counterName);
 
@@ -1525,6 +1471,15 @@ public abstract class Operator<T extends
     return true;
   }
 
+  /*
+   * If this task contains a sortmergejoin, it can be converted to a map-join task if this operator
+   * is present in the mapper. For eg. if a sort-merge join operator is present followed by a
+   * regular join, it cannot be converted to a auto map-join.
+   */
+  public boolean opAllowedBeforeSortMergeJoin() {
+    return true;
+  }
+
   public String toString() {
     return getName() + "[" + getIdentifier() + "]";
   }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Fri Apr 26 04:59:50 2013
@@ -72,14 +72,7 @@ public class PTFOperator extends Operato
 		hiveConf = new HiveConf(jobConf, PTFOperator.class);
 		// if the parent is ExtractOperator, this invocation is from reduce-side
 		Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
-		if (parentOp instanceof ExtractOperator)
-		{
-			isMapOperator = false;
-		}
-		else
-		{
-			isMapOperator = true;
-		}
+		isMapOperator = conf.isMapSide();
 
 		reconstructQueryDef(hiveConf);
     inputPart = createFirstPartitionForChain(

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFUtils.java Fri Apr 26 04:59:50 2013
@@ -31,6 +31,7 @@ import java.beans.XMLDecoder;
 import java.beans.XMLEncoder;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -261,25 +262,26 @@ public class PTFUtils {
     }
   }
 
-  public static void makeTransient(Class<?> beanClass, String pdName)
-  {
-    BeanInfo info;
-    try
-    {
-      info = Introspector.getBeanInfo(beanClass);
-      PropertyDescriptor[] propertyDescriptors = info
-          .getPropertyDescriptors();
-      for (int i = 0; i < propertyDescriptors.length; ++i)
-      {
-        PropertyDescriptor pd = propertyDescriptors[i];
-        if (pd.getName().equals(pdName))
-        {
-          pd.setValue("transient", Boolean.TRUE);
+  public static void makeTransient(Class<?> beanClass, String... pdNames) {
+    try {
+      BeanInfo info = Introspector.getBeanInfo(beanClass);
+      PropertyDescriptor[] descs = info.getPropertyDescriptors();
+      if (descs == null) {
+        throw new RuntimeException("Cannot access property descriptor for class " + beanClass);
+      }
+      Map<String, PropertyDescriptor> mapping = new HashMap<String, PropertyDescriptor>();
+      for (PropertyDescriptor desc : descs) {
+        mapping.put(desc.getName(), desc);
+      }
+      for (String pdName : pdNames) {
+        PropertyDescriptor desc = mapping.get(pdName);
+        if (desc == null) {
+          throw new RuntimeException("Property " + pdName + " does not exist in " + beanClass);
         }
+        desc.setValue("transient", Boolean.TRUE);
       }
     }
-    catch (IntrospectionException ie)
-    {
+    catch (IntrospectionException ie) {
       throw new RuntimeException(ie);
     }
   }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Apr 26 04:59:50 2013
@@ -80,6 +80,15 @@ public class ReduceSinkOperator extends 
   transient byte[] tagByte = new byte[1];
   transient protected int numDistributionKeys;
   transient protected int numDistinctExprs;
+  transient String inputAlias;  // input alias of this RS for join (used for PPD)
+
+  public void setInputAlias(String inputAlias) {
+    this.inputAlias = inputAlias;
+  }
+
+  public String getInputAlias() {
+    return inputAlias;
+  }
 
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Task.java Fri Apr 26 04:59:50 2013
@@ -80,6 +80,7 @@ public abstract class Task<T extends Ser
   // hive.auto.convert.join.noconditionaltask is set to true. No conditional task was
   // created in case the mapjoin failed.
   public static final int MAPJOIN_ONLY_NOBACKUP = 6;
+  public static final int CONVERTED_SORTMERGEJOIN = 7;
 
   // Descendants tasks who subscribe feeds from this task
   protected transient List<Task<? extends Serializable>> feedSubscribers;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Fri Apr 26 04:59:50 2013
@@ -163,4 +163,12 @@ public class UnionOperator extends Opera
   public boolean opAllowedAfterMapJoin() {
     return false;
   }
+
+  @Override
+  public boolean opAllowedBeforeSortMergeJoin() {
+    // If a union occurs before the sort-merge join, it is not useful to convert the the
+    // sort-merge join to a mapjoin. The number of inputs for the union is more than 1 so
+    // it would be difficult to figure out the big table for the mapjoin.
+    return false;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Fri Apr 26 04:59:50 2013
@@ -380,7 +380,7 @@ public final class Utilities {
 
   public static String getHiveJobID(Configuration job) {
     String planPath = HiveConf.getVar(job, HiveConf.ConfVars.PLAN);
-    if (planPath != null) {
+    if (planPath != null && !planPath.isEmpty()) {
       return (new Path(planPath)).getName();
     }
     return null;
@@ -2415,8 +2415,5 @@ public final class Utilities {
 
     return sb.toString();
   }
-
-  public static Class getBuiltinUtilsClass() throws ClassNotFoundException {
-    return Class.forName("org.apache.hive.builtins.BuiltinUtils");
-  }
 }
+

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HashMapWrapper.java Fri Apr 26 04:59:50 2013
@@ -158,8 +158,6 @@ public class HashMapWrapper<K, V> implem
   }
 
   public boolean isAbort(long numRows,LogHelper console) {
-    System.gc();
-    System.gc();
     int size = mHash.size();
     long usedMemory = memoryMXBean.getHeapMemoryUsage().getUsed();
     double rate = (double) usedMemory / (double) maxMemory;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinDoubleKeys.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -93,7 +95,7 @@ public class MapJoinDoubleKeys extends A
   public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
     try {
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(Integer.valueOf(metadataTag));
 
       Writable val = ctx.getSerDe().getSerializedClass().newInstance();
       val.readFields(in);
@@ -124,7 +126,8 @@ public class MapJoinDoubleKeys extends A
     try {
       // out.writeInt(metadataTag);
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+          Integer.valueOf(metadataTag));
 
       ArrayList<Object> list = MapJoinMetaData.getList();
       list.add(obj1);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectKey.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,9 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -95,7 +96,7 @@ public class MapJoinObjectKey  extends A
       ClassNotFoundException {
     try {
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+      HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
           Integer.valueOf(metadataTag));
 
       Writable val = ctx.getSerDe().getSerializedClass().newInstance();
@@ -119,7 +120,7 @@ public class MapJoinObjectKey  extends A
   public void writeExternal(ObjectOutput out) throws IOException {
     try {
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(
+      HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
           Integer.valueOf(metadataTag));
 
       // Different processing for key and value

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinObjectValue.java Fri Apr 26 04:59:50 2013
@@ -24,8 +24,10 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.io.ShortWritable;
@@ -89,7 +91,8 @@ public class MapJoinObjectValue implemen
       metadataTag = in.readInt();
 
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+          Integer.valueOf(metadataTag));
       int sz = in.readInt();
       MapJoinRowContainer<Object[]> res = new MapJoinRowContainer<Object[]>();
       if (sz > 0) {
@@ -132,7 +135,8 @@ public class MapJoinObjectValue implemen
       out.writeInt(metadataTag);
 
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+          Integer.valueOf(metadataTag));
 
       // Different processing for key and value
       MapJoinRowContainer<Object[]> v = obj;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinSingleKey.java Fri Apr 26 04:59:50 2013
@@ -23,8 +23,10 @@ import java.io.ObjectInput;
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 
-import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator;
 import org.apache.hadoop.hive.ql.exec.HashTableSinkOperator.HashTableSinkObjectCtx;
+import org.apache.hadoop.hive.ql.exec.MapJoinMetaData;
+import org.apache.hadoop.hive.ql.exec.MapJoinOperator;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
@@ -74,10 +76,12 @@ public class MapJoinSingleKey extends Ab
   }
 
   @Override
-  public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+  public void readExternal(ObjectInput in)
+      throws IOException, ClassNotFoundException {
     try {
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = MapJoinOperator.getMetadata().get(
+          Integer.valueOf(metadataTag));
 
       Writable val = ctx.getSerDe().getSerializedClass().newInstance();
       val.readFields(in);
@@ -106,7 +110,8 @@ public class MapJoinSingleKey extends Ab
     try {
       // out.writeInt(metadataTag);
       // get the tableDesc from the map stored in the mapjoin operator
-      HashTableSinkObjectCtx ctx = MapJoinMetaData.get(Integer.valueOf(metadataTag));
+      HashTableSinkObjectCtx ctx = HashTableSinkOperator.getMetadata().get(
+          Integer.valueOf(metadataTag));
 
       ArrayList<Object> list = MapJoinMetaData.getList();
       list.add(obj);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/RCFile.java Fri Apr 26 04:59:50 2013
@@ -243,6 +243,12 @@ public class RCFile {
       this.numberRows = numberRows;
     }
 
+    public void nullColumn(int columnIndex) {
+      eachColumnValueLen[columnIndex] = 0;
+      eachColumnUncompressedValueLen[columnIndex] = 0;
+      allCellValLenBuffer[columnIndex] = new NonSyncDataOutputBuffer();
+    }
+
     /**
      * add in a new column's meta data.
      *
@@ -553,6 +559,14 @@ public class RCFile {
       }
     }
 
+    public void nullColumn(int columnIndex) {
+      if (codec != null) {
+        compressedColumnsValueBuffer[columnIndex].reset();
+      } else {
+        loadedColumnsValueBuffer[columnIndex].reset();
+      }
+    }
+
     public void clearColumnBuffer() throws IOException {
       decompressBuffer.reset();
     }
@@ -1077,6 +1091,7 @@ public class RCFile {
       public int rowReadIndex;
       public int runLength;
       public int prvLength;
+      public boolean isNulled;
     }
     private final Path file;
     private final FSDataInputStream in;
@@ -1491,6 +1506,7 @@ public class RCFile {
         col.rowReadIndex = 0;
         col.runLength = 0;
         col.prvLength = -1;
+        col.isNulled = colValLenBufferReadIn[selIx].getLength() == 0;
       }
 
       return currentKeyLength;
@@ -1694,18 +1710,22 @@ public class RCFile {
           SelectedColumn col = selectedColumns[j];
           int i = col.colIndex;
 
-          BytesRefWritable ref = ret.unCheckedGet(i);
+          if (col.isNulled) {
+            ret.set(i, null);
+          } else {
+            BytesRefWritable ref = ret.unCheckedGet(i);
 
-          colAdvanceRow(j, col);
+            colAdvanceRow(j, col);
 
-          if (currentValue.decompressedFlag[j]) {
-            ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-                col.rowReadIndex, col.prvLength);
-          } else {
-            ref.set(currentValue.lazyDecompressCallbackObjs[j],
-                col.rowReadIndex, col.prvLength);
+            if (currentValue.decompressedFlag[j]) {
+              ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+                  col.rowReadIndex, col.prvLength);
+            } else {
+              ref.set(currentValue.lazyDecompressCallbackObjs[j],
+                  col.rowReadIndex, col.prvLength);
+            }
+            col.rowReadIndex += col.prvLength;
           }
-          col.rowReadIndex += col.prvLength;
         }
       } else {
         // This version of the loop eliminates a condition check and branch
@@ -1714,12 +1734,16 @@ public class RCFile {
           SelectedColumn col = selectedColumns[j];
           int i = col.colIndex;
 
-          BytesRefWritable ref = ret.unCheckedGet(i);
+          if (col.isNulled) {
+            ret.set(i, null);
+          } else {
+            BytesRefWritable ref = ret.unCheckedGet(i);
 
-          colAdvanceRow(j, col);
-          ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
-                col.rowReadIndex, col.prvLength);
-          col.rowReadIndex += col.prvLength;
+            colAdvanceRow(j, col);
+            ref.set(currentValue.loadedColumnsValueBuffer[j].getData(),
+                  col.rowReadIndex, col.prvLength);
+            col.rowReadIndex += col.prvLength;
+          }
         }
       }
       rowFetched = true;

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Apr 26 04:59:50 2013
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -70,13 +71,23 @@ public final class OrcFile {
    */
   public static Writer createWriter(FileSystem fs,
                                     Path path,
+                                    Configuration conf,
                                     ObjectInspector inspector,
                                     long stripeSize,
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
     return new WriterImpl(fs, path, inspector, stripeSize, compress,
-      bufferSize, rowIndexStride);
+      bufferSize, rowIndexStride, getMemoryManager(conf));
   }
 
+  private static MemoryManager memoryManager = null;
+
+  private static synchronized
+  MemoryManager getMemoryManager(Configuration conf) {
+    if (memoryManager == null) {
+      memoryManager = new MemoryManager(conf);
+    }
+    return memoryManager;
+  }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Fri Apr 26 04:59:50 2013
@@ -71,8 +71,8 @@ public class OrcOutputFormat extends Fil
     public void write(NullWritable nullWritable,
                       OrcSerdeRow row) throws IOException {
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, row.getInspector(), stripeSize,
-          compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
+            stripeSize, compress, compressionSize, rowIndexStride);
       }
       writer.addRow(row.getRow());
     }
@@ -81,8 +81,9 @@ public class OrcOutputFormat extends Fil
     public void write(Writable row) throws IOException {
       OrcSerdeRow serdeRow = (OrcSerdeRow) row;
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, serdeRow.getInspector(),
-            stripeSize, compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf,
+            serdeRow.getInspector(), stripeSize, compress, compressionSize,
+            rowIndexStride);
       }
       writer.addRow(serdeRow.getRow());
     }
@@ -101,8 +102,8 @@ public class OrcOutputFormat extends Fil
         ObjectInspector inspector = ObjectInspectorFactory.
             getStandardStructObjectInspector(new ArrayList<String>(),
                 new ArrayList<ObjectInspector>());
-        writer = OrcFile.createWriter(fs, path, inspector, stripeSize,
-            compress, compressionSize, rowIndexStride);
+        writer = OrcFile.createWriter(fs, path, this.conf, inspector,
+            stripeSize, compress, compressionSize, rowIndexStride);
       }
       writer.close();
     }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcStruct.java Fri Apr 26 04:59:50 2013
@@ -17,6 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
 import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -31,16 +38,9 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
 final class OrcStruct implements Writable {
 
-  private final Object[] fields;
+  private Object[] fields;
 
   OrcStruct(int children) {
     fields = new Object[children];
@@ -54,6 +54,14 @@ final class OrcStruct implements Writabl
     fields[fieldIndex] = value;
   }
 
+  public int getNumFields() {
+    return fields.length;
+  }
+
+  public void setNumFields(int numFields) {
+    fields = new Object[numFields];
+  }
+
    @Override
   public void write(DataOutput dataOutput) throws IOException {
     throw new UnsupportedOperationException("write unsupported");

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Fri Apr 26 04:59:50 2013
@@ -733,8 +733,9 @@ class RecordReaderImpl implements Record
         } else {
           length = dictionaryBuffer.size() - offset;
         }
-        // If the column is just empty strings, the size will be zero, so the buffer will be null,
-        // in that case just return result as it will default to empty
+        // If the column is just empty strings, the size will be zero,
+        // so the buffer will be null, in that case just return result
+        // as it will default to empty
         if (dictionaryBuffer != null) {
           dictionaryBuffer.setText(result, offset, length);
         } else {
@@ -788,6 +789,13 @@ class RecordReaderImpl implements Record
           result = new OrcStruct(fields.length);
         } else {
           result = (OrcStruct) previous;
+
+          // If the input format was initialized with a file with a
+          // different number of fields, the number of fields needs to
+          // be updated to the correct number
+          if (result.getNumFields() != fields.length) {
+            result.setNumFields(fields.length);
+          }
         }
         for(int i=0; i < fields.length; ++i) {
           if (fields[i] != null) {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Fri Apr 26 04:59:50 2013
@@ -18,8 +18,15 @@
 
 package org.apache.hadoop.hive.ql.io.orc;
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,14 +51,8 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 
-import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
 
 /**
  * An ORC file writer. The file is divided into stripes, which is the natural
@@ -62,7 +63,7 @@ import java.util.TreeMap;
  * sub-types. Each of the TreeWriters writes the column's data as a set of
  * streams.
  */
-class WriterImpl implements Writer {
+class WriterImpl implements Writer, MemoryManager.Callback {
 
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
@@ -97,6 +98,7 @@ class WriterImpl implements Writer {
   private final OrcProto.RowIndex.Builder rowIndex =
       OrcProto.RowIndex.newBuilder();
   private final boolean buildIndex;
+  private final MemoryManager memoryManager;
 
   WriterImpl(FileSystem fs,
              Path path,
@@ -104,13 +106,15 @@ class WriterImpl implements Writer {
              long stripeSize,
              CompressionKind compress,
              int bufferSize,
-             int rowIndexStride) throws IOException {
+             int rowIndexStride,
+             MemoryManager memoryManager) throws IOException {
     this.fs = fs;
     this.path = path;
     this.stripeSize = stripeSize;
     this.compress = compress;
     this.bufferSize = bufferSize;
     this.rowIndexStride = rowIndexStride;
+    this.memoryManager = memoryManager;
     buildIndex = rowIndexStride > 0;
     codec = createCodec(compress);
     treeWriter = createTreeWriter(inspector, streamFactory, false);
@@ -118,6 +122,8 @@ class WriterImpl implements Writer {
       throw new IllegalArgumentException("Row stride must be at least " +
           MIN_ROW_INDEX_STRIDE);
     }
+    // ensure that we are able to handle callbacks before we register ourselves
+    memoryManager.addWriter(path, stripeSize, this);
   }
 
   static CompressionCodec createCodec(CompressionKind kind) {
@@ -147,6 +153,13 @@ class WriterImpl implements Writer {
     }
   }
 
+  @Override
+  public void checkMemory(double newScale) throws IOException {
+    if (estimateStripeSize() > Math.round(stripeSize * newScale)) {
+     flushStripe();
+    }
+  }
+
   /**
    * This class is used to hold the contents of streams as they are buffered.
    * The TreeWriters write to the outStream and the codec compresses the
@@ -734,19 +747,8 @@ class WriterImpl implements Writer {
       int length = rows.size();
       int rowIndexEntry = 0;
       OrcProto.RowIndex.Builder rowIndex = getRowIndex();
-      // need to build the first index entry out here, to handle the case of
-      // not having any values.
-      if (buildIndex) {
-        while (0 == rowIndexValueCount.get(rowIndexEntry) &&
-            rowIndexEntry < savedRowIndex.size()) {
-          OrcProto.RowIndexEntry.Builder base =
-              savedRowIndex.get(rowIndexEntry++).toBuilder();
-          rowOutput.getPosition(new RowIndexPositionRecorder(base));
-          rowIndex.addEntry(base.build());
-        }
-      }
       // write the values translated into the dump order.
-      for(int i = 0; i < length; ++i) {
+      for(int i = 0; i <= length; ++i) {
         // now that we are writing out the row values, we can finalize the
         // row index
         if (buildIndex) {
@@ -758,7 +760,9 @@ class WriterImpl implements Writer {
             rowIndex.addEntry(base.build());
           }
         }
-        rowOutput.write(dumpOrder[rows.get(i)]);
+        if (i != length) {
+          rowOutput.write(dumpOrder[rows.get(i)]);
+        }
       }
       // we need to build the rowindex before calling super, since it
       // writes it out.
@@ -1453,8 +1457,8 @@ class WriterImpl implements Writer {
       }
     }
     // once every 1000 rows, check the size to see if we should spill
-    if (rowsInStripe % 1000 == 0 && estimateStripeSize() > stripeSize) {
-      flushStripe();
+    if (rowsInStripe % 1000 == 0) {
+      checkMemory(memoryManager.getAllocationScale());
     }
   }
 
@@ -1464,5 +1468,6 @@ class WriterImpl implements Writer {
     int footerLength = writeFooter(rawWriter.getPos());
     rawWriter.writeByte(writePostScript(footerLength));
     rawWriter.close();
+    memoryManager.removeWriter(path);
   }
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileKeyBufferWrapper.java Fri Apr 26 04:59:50 2013
@@ -38,7 +38,7 @@ public class RCFileKeyBufferWrapper impl
 
   protected CompressionCodec codec;
 
-  protected RCFileKeyBufferWrapper() {
+  public RCFileKeyBufferWrapper() {
   }
 
   public static RCFileKeyBufferWrapper create(KeyBuffer currentKeyBufferObj) {
@@ -66,4 +66,48 @@ public class RCFileKeyBufferWrapper impl
     return keyBuffer;
   }
 
+  public void setKeyBuffer(KeyBuffer keyBuffer) {
+    this.keyBuffer = keyBuffer;
+  }
+
+  public int getRecordLength() {
+    return recordLength;
+  }
+
+  public void setRecordLength(int recordLength) {
+    this.recordLength = recordLength;
+  }
+
+  public int getKeyLength() {
+    return keyLength;
+  }
+
+  public void setKeyLength(int keyLength) {
+    this.keyLength = keyLength;
+  }
+
+  public int getCompressedKeyLength() {
+    return compressedKeyLength;
+  }
+
+  public void setCompressedKeyLength(int compressedKeyLength) {
+    this.compressedKeyLength = compressedKeyLength;
+  }
+
+  public Path getInputPath() {
+    return inputPath;
+  }
+
+  public void setInputPath(Path inputPath) {
+    this.inputPath = inputPath;
+  }
+
+  public CompressionCodec getCodec() {
+    return codec;
+  }
+
+  public void setCodec(CompressionCodec codec) {
+    this.codec = codec;
+  }
+
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/io/rcfile/merge/RCFileValueBufferWrapper.java Fri Apr 26 04:59:50 2013
@@ -48,4 +48,12 @@ public class RCFileValueBufferWrapper im
     return this.valueBuffer.compareTo(o.valueBuffer);
   }
 
+  public ValueBuffer getValueBuffer() {
+    return valueBuffer;
+  }
+
+  public void setValueBuffer(ValueBuffer valueBuffer) {
+    this.valueBuffer = valueBuffer;
+  }
+
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java Fri Apr 26 04:59:50 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.hive.serde2.laz
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.mapred.SequenceFileInputFormat;
 import org.apache.hadoop.mapred.SequenceFileOutputFormat;
@@ -87,6 +88,11 @@ public class DefaultStorageHandler imple
   }
 
   @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+    //do nothing by default
+  }
+
+  @Override
   public Configuration getConf() {
     return conf;
   }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java Fri Apr 26 04:59:50 2013
@@ -1671,7 +1671,7 @@ private void constructOneLBLocationMap(F
     List<String> names = null;
     Table t = getTable(dbName, tblName);
 
-    List<String> pvals = getPvals(t.getPartCols(), partSpec);
+    List<String> pvals = MetaStoreUtils.getPvals(t.getPartCols(), partSpec);
 
     try {
       names = getMSC().listPartitionNames(dbName, tblName, pvals, max);
@@ -1713,19 +1713,6 @@ private void constructOneLBLocationMap(F
     }
   }
 
-  public static List<String> getPvals(List<FieldSchema> partCols,
-      Map<String, String> partSpec) {
-    List<String> pvals = new ArrayList<String>();
-    for (FieldSchema field : partCols) {
-      String val = partSpec.get(field.getName());
-      if (val == null) {
-        val = "";
-      }
-      pvals.add(val);
-    }
-    return pvals;
-  }
-
   /**
    * get all the partitions of the table that matches the given partial
    * specification. partition columns whose value is can be anything should be
@@ -1745,7 +1732,7 @@ private void constructOneLBLocationMap(F
           "partitioned table");
     }
 
-    List<String> partialPvals = getPvals(tbl.getPartCols(), partialPartSpec);
+    List<String> partialPvals = MetaStoreUtils.getPvals(tbl.getPartCols(), partialPartSpec);
 
     List<org.apache.hadoop.hive.metastore.api.Partition> partitions = null;
     try {
@@ -2251,6 +2238,18 @@ private void constructOneLBLocationMap(F
     }
   }
 
+  public void exchangeTablePartitions(Map<String, String> partitionSpecs,
+      String sourceDb, String sourceTable, String destDb,
+      String destinationTableName) throws HiveException {
+    try {
+      getMSC().exchange_partition(partitionSpecs, sourceDb, sourceTable, destDb,
+        destinationTableName);
+    } catch (Exception ex) {
+      LOG.error(StringUtils.stringifyException(ex));
+      throw new HiveException(ex);
+    }
+  }
+
   /**
    * Creates a metastore client. Currently it creates only JDBC based client as
    * File based store support is removed

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java Fri Apr 26 04:59:50 2013
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
 
 /**
@@ -133,4 +134,12 @@ public interface HiveStorageHandler exte
   public void configureTableJobProperties(
     TableDesc tableDesc,
     Map<String, String> jobProperties);
+
+  /**
+   * Called just before submitting MapReduce job.
+   *
+   * @param tableDesc descriptor for the table being accessed
+   * @param JobConf jobConf for MapReduce job
+   */
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf);
 }

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Fri Apr 26 04:59:50 2013
@@ -472,6 +472,10 @@ abstract public class AbstractSMBJoinPro
       (BigTableSelectorForAutoSMJ) ReflectionUtils.newInstance(bigTableMatcherClass, null);
     int bigTablePosition =
       bigTableMatcher.getBigTablePosition(pGraphContext, joinOp);
+    if (bigTablePosition < 0) {
+      // contains aliases from sub-query
+      return false;
+    }
     context.setBigTablePosition(bigTablePosition);
     String joinAlias =
       bigTablePosition == 0 ?

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AvgPartitionSizeBasedBigTableSelectorForAutoSMJ.java Fri Apr 26 04:59:50 2013
@@ -57,6 +57,9 @@ public class AvgPartitionSizeBasedBigTab
       getListTopOps(joinOp, topOps);
       int currentPos = 0;
       for (TableScanOperator topOp : topOps) {
+        if (topOp == null) {
+          return -1;
+        }
         int numPartitions = 1; // in case the sizes match, preference is
                                // given to the table with fewer partitions
         Table table = parseCtx.getTopToTable().get(topOp);

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java Fri Apr 26 04:59:50 2013
@@ -249,6 +249,7 @@ public class BucketingSortingReduceSinkO
       fsOp.getConf().setMultiFileSpray(false);
       fsOp.getConf().setTotalFiles(1);
       fsOp.getConf().setNumFiles(1);
+      fsOp.getConf().setRemovedReduceSinkBucketSort(true);
       tsOp.setUseBucketizedHiveInputFormat(true);
     }
 

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRUnion1.java Fri Apr 26 04:59:50 2013
@@ -259,6 +259,9 @@ public class GenMRUnion1 implements Node
     // Copy into the current union task plan if
     if (uPrsCtx.getMapOnlySubq(pos) && uPrsCtx.getRootTask(pos)) {
       processSubQueryUnionMerge(ctx, uCtxTask, union, stack);
+      if (ctx.getRootTasks().contains(currTask)) {
+        ctx.getRootTasks().remove(currTask);
+      }
     }
     // If it a map-reduce job, create a temporary file
     else {

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Fri Apr 26 04:59:50 2013
@@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.exec.ConditionalTask;
+import org.apache.hadoop.hive.ql.exec.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.JoinOperator;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorFactory;
@@ -761,6 +763,41 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Set the key and value description for all the tasks rooted at the given
+   * task. Loops over all the tasks recursively.
+   *
+   * @param task
+   */
+  public static void setKeyAndValueDescForTaskTree(Task<? extends Serializable> task) {
+
+    if (task instanceof ConditionalTask) {
+      List<Task<? extends Serializable>> listTasks = ((ConditionalTask) task)
+          .getListTasks();
+      for (Task<? extends Serializable> tsk : listTasks) {
+        setKeyAndValueDescForTaskTree(tsk);
+      }
+    } else if (task instanceof ExecDriver) {
+      MapredWork work = (MapredWork) task.getWork();
+      work.deriveExplainAttributes();
+      HashMap<String, Operator<? extends OperatorDesc>> opMap = work
+          .getAliasToWork();
+      if (opMap != null && !opMap.isEmpty()) {
+        for (Operator<? extends OperatorDesc> op : opMap.values()) {
+          setKeyAndValueDesc(work, op);
+        }
+      }
+    }
+
+    if (task.getChildTasks() == null) {
+      return;
+    }
+
+    for (Task<? extends Serializable> childTask : task.getChildTasks()) {
+      setKeyAndValueDescForTaskTree(childTask);
+    }
+  }
+
+  /**
    * create a new plan and return.
    *
    * @return the new plan

Modified: hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java?rev=1476039&r1=1476038&r2=1476039&view=diff
==============================================================================
--- hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java (original)
+++ hive/branches/HIVE-4115/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GroupByOptimizer.java Fri Apr 26 04:59:50 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hive.ql.optimiz
 import org.apache.hadoop.hive.ql.parse.ParseContext;
 import org.apache.hadoop.hive.ql.parse.PrunedPartitionList;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
+import org.apache.hadoop.hive.ql.plan.AggregationDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -60,6 +61,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.SelectDesc;
+import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator.Mode;
 import org.apache.hadoop.util.StringUtils;
 
 /**
@@ -107,7 +109,7 @@ public class GroupByOptimizer implements
     GraphWalker ogw = new DefaultGraphWalker(disp);
 
     // Create a list of topop nodes
-    ArrayList<Node> topNodes = new ArrayList<Node>();
+    List<Node> topNodes = new ArrayList<Node>();
     topNodes.addAll(pctx.getTopOps().values());
     ogw.startWalking(topNodes, null);
 
@@ -174,15 +176,83 @@ public class GroupByOptimizer implements
       GroupByOptimizerSortMatch match = checkSortGroupBy(stack, groupByOp);
       boolean useMapperSort =
           HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT);
+      GroupByDesc groupByOpDesc = groupByOp.getConf();
+
+      boolean removeReduceSink = false;
+      boolean optimizeDistincts = false;
+      boolean setBucketGroup = false;
 
       // Dont remove the operator for distincts
-      if (useMapperSort && !groupByOp.getConf().isDistinct() &&
+      if (useMapperSort &&
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
-        convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+        if (!groupByOpDesc.isDistinct()) {
+          removeReduceSink = true;
+        }
+        else if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEGROUPBYSKEW)) {
+          // Optimize the query: select count(distinct keys) from T, where
+          // T is bucketized and sorted by T
+          // Partial aggregation can be done by the mappers in this scenario
+
+          List<ExprNodeDesc> keys =
+              ((GroupByOperator)
+              (groupByOp.getChildOperators().get(0).getChildOperators().get(0)))
+                  .getConf().getKeys();
+          if ((keys == null) || (keys.isEmpty())) {
+            optimizeDistincts = true;
+          }
+        }
       }
-      else if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
+
+      if ((match == GroupByOptimizerSortMatch.PARTIAL_MATCH) ||
           (match == GroupByOptimizerSortMatch.COMPLETE_MATCH)) {
-        groupByOp.getConf().setBucketGroup(true);
+        setBucketGroup = true;
+      }
+
+      if (removeReduceSink) {
+        convertGroupByMapSideSortedGroupBy(hiveConf, groupByOp, depth);
+      }
+      else if (optimizeDistincts) {
+        // In test mode, dont change the query plan. However, setup a query property
+        pGraphContext.getQueryProperties().setHasMapGroupBy(true);
+        if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_MAP_GROUPBY_SORT_TESTMODE)) {
+          return;
+        }
+        ReduceSinkOperator reduceSinkOp =
+            (ReduceSinkOperator)groupByOp.getChildOperators().get(0);
+        GroupByDesc childGroupByDesc =
+            ((GroupByOperator)
+            (reduceSinkOp.getChildOperators().get(0))).getConf();
+
+        for (int pos = 0; pos < childGroupByDesc.getAggregators().size(); pos++) {
+          AggregationDesc aggr = childGroupByDesc.getAggregators().get(pos);
+          // Partial aggregation is not done for distincts on the mapper
+          // However, if the data is bucketed/sorted on the distinct key, partial aggregation
+          // can be performed on the mapper.
+          if (aggr.getDistinct()) {
+            ArrayList<ExprNodeDesc> parameters = new ArrayList<ExprNodeDesc>();
+            ExprNodeDesc param = aggr.getParameters().get(0);
+            assert param instanceof ExprNodeColumnDesc;
+            ExprNodeColumnDesc paramC = (ExprNodeColumnDesc) param;
+            paramC.setIsPartitionColOrVirtualCol(false);
+            paramC.setColumn("VALUE._col" + pos);
+            parameters.add(paramC);
+            aggr.setParameters(parameters);
+            aggr.setDistinct(false);
+            aggr.setMode(Mode.FINAL);
+          }
+        }
+        // Partial aggregation is performed on the mapper, no distinct processing at the reducer
+        childGroupByDesc.setDistinct(false);
+        groupByOpDesc.setDontResetAggrsDistinct(true);
+        groupByOpDesc.setBucketGroup(true);
+        groupByOp.setUseBucketizedHiveInputFormat(true);
+        // no distinct processing at the reducer
+        // A query like 'select count(distinct key) from T' is transformed into
+        // 'select count(key) from T' as far as the reducer is concerned.
+        reduceSinkOp.getConf().setDistinctColumnIndices(new ArrayList<List<Integer>>());
+      }
+      else if (setBucketGroup) {
+        groupByOpDesc.setBucketGroup(true);
       }
     }
 
@@ -339,8 +409,8 @@ public class GroupByOptimizer implements
 
         GroupByOptimizerSortMatch currentMatch =
             notDeniedPartns.isEmpty() ? GroupByOptimizerSortMatch.NO_MATCH :
-              notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
-                GroupByOptimizerSortMatch.COMPLETE_MATCH;
+                notDeniedPartns.size() > 1 ? GroupByOptimizerSortMatch.PARTIAL_MATCH :
+                    GroupByOptimizerSortMatch.COMPLETE_MATCH;
         for (Partition part : notDeniedPartns) {
           List<String> sortCols = part.getSortColNames();
           List<String> bucketCols = part.getBucketCols();
@@ -440,8 +510,9 @@ public class GroupByOptimizer implements
       case NO_MATCH:
         return GroupByOptimizerSortMatch.NO_MATCH;
       case COMPLETE_MATCH:
-        return ((bucketCols != null) && !bucketCols.isEmpty() && sortCols.containsAll(bucketCols)) ?
-          GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
+        return ((bucketCols != null) && !bucketCols.isEmpty() &&
+            sortCols.containsAll(bucketCols)) ?
+            GroupByOptimizerSortMatch.COMPLETE_MATCH : GroupByOptimizerSortMatch.PARTIAL_MATCH;
       case PREFIX_COL1_MATCH:
         return GroupByOptimizerSortMatch.NO_MATCH;
       case PREFIX_COL2_MATCH: