You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/11/05 08:01:58 UTC

svn commit: r1538880 [4/46] - in /hive/branches/tez: ./ ant/ ant/src/org/apache/hadoop/hive/ant/ beeline/ beeline/src/java/org/apache/hive/beeline/ beeline/src/main/ beeline/src/test/org/apache/hive/beeline/src/test/ cli/ common/ common/src/java/conf/ ...

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveAlterHandler.java Tue Nov  5 07:01:32 2013
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidObjectException;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -195,6 +196,12 @@ public class HiveAlterHandler implements
             msdb.alterPartition(dbname, name, part.getValues(), part);
           }
         }
+      } else if (MetaStoreUtils.requireCalStats(hiveConf, null, null, newt) &&
+        (newt.getPartitionKeysSize() == 0)) {
+          Database db = msdb.getDatabase(newt.getDbName());
+          // Update table stats. For partitioned table, we update stats in
+          // alterPartition()
+          MetaStoreUtils.updateUnpartitionedTableStatsFast(db, newt, wh, false, true);
       }
       // now finally call alter table
       msdb.alterTable(dbname, name, newt);
@@ -254,10 +261,10 @@ public class HiveAlterHandler implements
     Path destPath = null;
     FileSystem srcFs = null;
     FileSystem destFs = null;
-    Table tbl = null;
     Partition oldPart = null;
     String oldPartLoc = null;
     String newPartLoc = null;
+
     // Set DDL time to now if not specified
     if (new_part.getParameters() == null ||
         new_part.getParameters().get(hive_metastoreConstants.DDL_TIME) == null ||
@@ -265,10 +272,15 @@ public class HiveAlterHandler implements
       new_part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
           .currentTimeMillis() / 1000));
     }
+
+    Table tbl = msdb.getTable(dbname, name);
     //alter partition
     if (part_vals == null || part_vals.size() == 0) {
       try {
         oldPart = msdb.getPartition(dbname, name, new_part.getValues());
+        if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
+          MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
+        }
         msdb.alterPartition(dbname, name, new_part.getValues(), new_part);
       } catch (InvalidObjectException e) {
         throw new InvalidOperationException("alter is not possible");
@@ -299,7 +311,6 @@ public class HiveAlterHandler implements
         throw new AlreadyExistsException("Partition already exists:" + dbname + "." + name + "." +
             new_part.getValues());
       }
-      tbl = msdb.getTable(dbname, name);
       if (tbl == null) {
         throw new InvalidObjectException(
             "Unable to rename partition because table or database do not exist");
@@ -351,6 +362,9 @@ public class HiveAlterHandler implements
               + tbl.getTableName() + " " + new_part.getValues());
           }
           new_part.getSd().setLocation(newPartLoc);
+          if (MetaStoreUtils.requireCalStats(hiveConf, oldPart, new_part, tbl)) {
+            MetaStoreUtils.updatePartitionStatsFast(new_part, wh, false, true);
+          }
           msdb.alterPartition(dbname, name, part_vals, new_part);
         }
       }
@@ -399,6 +413,7 @@ public class HiveAlterHandler implements
       MetaException {
     List<Partition> oldParts = new ArrayList<Partition>();
     List<List<String>> partValsList = new ArrayList<List<String>>();
+    Table tbl = msdb.getTable(dbname, name);
     try {
       for (Partition tmpPart: new_parts) {
         // Set DDL time to now if not specified
@@ -408,9 +423,14 @@ public class HiveAlterHandler implements
           tmpPart.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(System
               .currentTimeMillis() / 1000));
         }
+
         Partition oldTmpPart = msdb.getPartition(dbname, name, tmpPart.getValues());
         oldParts.add(oldTmpPart);
         partValsList.add(tmpPart.getValues());
+
+        if (MetaStoreUtils.requireCalStats(hiveConf, oldTmpPart, tmpPart, tbl)) {
+          MetaStoreUtils.updatePartitionStatsFast(tmpPart, wh, false, true);
+        }
       }
       msdb.alterPartitions(dbname, name, partValsList, new_parts);
     } catch (InvalidObjectException e) {

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Tue Nov  5 07:01:32 2013
@@ -1029,7 +1029,8 @@ public class HiveMetaStore extends Thrif
 
         ms.openTransaction();
 
-        if (ms.getDatabase(tbl.getDbName()) == null) {
+        Database db = ms.getDatabase(tbl.getDbName());
+        if (db == null) {
           throw new NoSuchObjectException("The database " + tbl.getDbName() + " does not exist");
         }
 
@@ -1063,6 +1064,14 @@ public class HiveMetaStore extends Thrif
             madeDir = true;
           }
         }
+        if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+            !MetaStoreUtils.isView(tbl)) {
+          if (tbl.getPartitionKeysSize() == 0)  { // Unpartitioned table
+            MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir);
+          } else { // Partitioned table with no partitions.
+            MetaStoreUtils.updateUnpartitionedTableStatsFast(db, tbl, wh, true);
+          }
+        }
 
         // set create time
         long time = System.currentTimeMillis() / 1000;
@@ -1540,6 +1549,11 @@ public class HiveMetaStore extends Thrif
         part.setCreateTime((int) time);
         part.putToParameters(hive_metastoreConstants.DDL_TIME, Long.toString(time));
 
+        if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+            !MetaStoreUtils.isView(tbl)) {
+          MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir);
+        }
+
         success = ms.addPartition(part);
         if (success) {
           success = ms.commitTransaction();
@@ -1760,6 +1774,11 @@ public class HiveMetaStore extends Thrif
           }
         }
 
+        if (HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER) &&
+            !MetaStoreUtils.isView(tbl)) {
+          MetaStoreUtils.updatePartitionStatsFast(part, wh, madeDir);
+        }
+
         // set create time
         long time = System.currentTimeMillis() / 1000;
         part.setCreateTime((int) time);

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java Tue Nov  5 07:01:32 2013
@@ -41,13 +41,17 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.JavaUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.SerDeInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -140,6 +144,166 @@ public class MetaStoreUtils {
   }
 
   /**
+   * @param partParams
+   * @return True if the passed Parameters Map contains values for all "Fast Stats".
+   */
+  public static boolean containsAllFastStats(Map<String, String> partParams) {
+    List<String> fastStats = StatsSetupConst.getStatsFastCollection();
+    for (String stat : fastStats) {
+      if (!partParams.containsKey(stat)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh)
+      throws MetaException {
+    return updateUnpartitionedTableStatsFast(db, tbl, wh, false, false);
+  }
+
+  public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh,
+      boolean madeDir) throws MetaException {
+    return updateUnpartitionedTableStatsFast(db, tbl, wh, madeDir, false);
+  }
+
+  /**
+   * Updates the numFiles and totalSize parameters for the passed unpartitioned Table by querying
+   * the warehouse if the passed Table does not already have values for these parameters.
+   * @param db
+   * @param tbl
+   * @param wh
+   * @param newDir if true, the directory was just created and can be assumed to be empty
+   * @param forceRecompute Recompute stats even if the passed Table already has
+   * these parameters set
+   * @return true if the stats were updated, false otherwise
+   */
+  public static boolean updateUnpartitionedTableStatsFast(Database db, Table tbl, Warehouse wh,
+      boolean newDir, boolean forceRecompute) throws MetaException {
+    Map<String,String> params = tbl.getParameters();
+    boolean updated = false;
+    if (forceRecompute ||
+        params == null ||
+        !containsAllFastStats(params)) {
+      if (params == null) {
+        params = new HashMap<String,String>();
+      }
+      if (!newDir) {
+        // The table location already exists and may contain data.
+        // Let's try to populate those stats that don't require full scan.
+        LOG.info("Updating table stats fast for " + tbl.getTableName());
+        FileStatus[] fileStatus = wh.getFileStatusesForUnpartitionedTable(db, tbl);
+        params.put(StatsSetupConst.NUM_FILES, Integer.toString(fileStatus.length));
+        long tableSize = 0L;
+        for (FileStatus status : fileStatus) {
+          tableSize += status.getLen();
+        }
+        params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(tableSize));
+        LOG.info("Updated size of table " + tbl.getTableName() +" to "+ Long.toString(tableSize));
+        if (params.containsKey(StatsSetupConst.ROW_COUNT) ||
+            params.containsKey(StatsSetupConst.RAW_DATA_SIZE)) {
+          // TODO: Add a MetaStore flag indicating accuracy of these stats and update it here.
+        }
+      }
+      tbl.setParameters(params);
+      updated = true;
+    }
+    return updated;
+  }
+
+  // check if stats need to be (re)calculated
+  public static boolean requireCalStats(Configuration hiveConf, Partition oldPart,
+    Partition newPart, Table tbl) {
+
+    if (!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVESTATSAUTOGATHER)) {
+      return false;
+    }
+
+    if (MetaStoreUtils.isView(tbl)) {
+      return false;
+    }
+
+    if  (oldPart == null && newPart == null) {
+      return true;
+    }
+
+    // requires to calculate stats if new partition doesn't have it
+    if ((newPart == null) || (newPart.getParameters() == null)
+        || !containsAllFastStats(newPart.getParameters())) {
+      return true;
+    }
+
+    // requires to calculate stats if new and old have different fast stats
+    if ((oldPart != null) && (oldPart.getParameters() != null)) {
+      for (String stat : StatsSetupConst.getStatsFastCollection()) {
+        if (oldPart.getParameters().containsKey(stat)) {
+          Long oldStat = Long.parseLong(oldPart.getParameters().get(stat));
+          Long newStat = Long.parseLong(newPart.getParameters().get(stat));
+          if (oldStat != newStat) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  public static boolean updatePartitionStatsFast(Partition part, Warehouse wh)
+      throws MetaException {
+    return updatePartitionStatsFast(part, wh, false, false);
+  }
+
+  public static boolean updatePartitionStatsFast(Partition part, Warehouse wh, boolean madeDir)
+      throws MetaException {
+    return updatePartitionStatsFast(part, wh, madeDir, false);
+  }
+
+  /**
+   * Updates the numFiles and totalSize parameters for the passed Partition by querying
+   *  the warehouse if the passed Partition does not already have values for these parameters.
+   * @param part
+   * @param wh
+   * @param madeDir if true, the directory was just created and can be assumed to be empty
+   * @param forceRecompute Recompute stats even if the passed Partition already has
+   * these parameters set
+   * @return true if the stats were updated, false otherwise
+   */
+  public static boolean updatePartitionStatsFast(Partition part, Warehouse wh,
+      boolean madeDir, boolean forceRecompute) throws MetaException {
+    Map<String,String> params = part.getParameters();
+    boolean updated = false;
+    if (forceRecompute ||
+        params == null ||
+        !containsAllFastStats(params)) {
+      if (params == null) {
+        params = new HashMap<String,String>();
+      }
+      if (!madeDir) {
+        // The partitition location already existed and may contain data. Lets try to
+        // populate those statistics that don't require a full scan of the data.
+        LOG.warn("Updating partition stats fast for: " + part.getTableName());
+        FileStatus[] fileStatus = wh.getFileStatusesForPartition(part);
+        params.put(StatsSetupConst.NUM_FILES, Integer.toString(fileStatus.length));
+        long partSize = 0L;
+        for (int i = 0; i < fileStatus.length; i++) {
+          partSize += fileStatus[i].getLen();
+        }
+        params.put(StatsSetupConst.TOTAL_SIZE, Long.toString(partSize));
+        LOG.warn("Updated size to " + Long.toString(partSize));
+        if (params.containsKey(StatsSetupConst.ROW_COUNT) ||
+            params.containsKey(StatsSetupConst.RAW_DATA_SIZE)) {
+          // The accuracy of these "collectable" stats at this point is suspect unless we know that
+          // StatsTask was just run before this MetaStore call and populated them.
+          // TODO: Add a MetaStore flag indicating accuracy of these stats and update it here.
+        }
+      }
+      part.setParameters(params);
+      updated = true;
+    }
+    return updated;
+  }
+
+  /**
    * getDeserializer
    *
    * Get the Deserializer for a table given its name and properties.
@@ -1122,6 +1286,13 @@ public class MetaStoreUtils {
     return filter.toString();
   }
 
+  public static boolean isView(Table table) {
+    if (table == null) {
+      return false;
+    }
+    return TableType.VIRTUAL_VIEW.toString().equals(table.getTableType());
+  }
+
   /**
    * create listener instances as per the configuration.
    *

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/Warehouse.java Tue Nov  5 07:01:32 2013
@@ -45,11 +45,15 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.common.JavaUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -496,6 +500,63 @@ public class Warehouse {
   }
 
   /**
+   * @param partn
+   * @return array of FileStatus objects corresponding to the files making up the passed partition
+   */
+  public FileStatus[] getFileStatusesForPartition(Partition partn)
+      throws MetaException {
+    try {
+      Path path = new Path(partn.getSd().getLocation());
+      FileSystem fileSys = path.getFileSystem(conf);
+      /* consider sub-directory created from list bucketing. */
+      int listBucketingDepth = calculateListBucketingDMLDepth(partn);
+      return HiveStatsUtils.getFileStatusRecurse(path, (1 + listBucketingDepth), fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
+   * List bucketing will introduce sub-directories.
+   * calculate it here in order to go to the leaf directory
+   * so that we can count right number of files.
+   * @param partn
+   * @return
+   */
+  private static int calculateListBucketingDMLDepth(Partition partn) {
+    // list bucketing will introduce more files
+    int listBucketingDepth = 0;
+    SkewedInfo skewedInfo = partn.getSd().getSkewedInfo();
+    if ((skewedInfo != null) && (skewedInfo.getSkewedColNames() != null)
+        && (skewedInfo.getSkewedColNames().size() > 0)
+        && (skewedInfo.getSkewedColValues() != null)
+        && (skewedInfo.getSkewedColValues().size() > 0)
+        && (skewedInfo.getSkewedColValueLocationMaps() != null)
+        && (skewedInfo.getSkewedColValueLocationMaps().size() > 0)) {
+      listBucketingDepth = skewedInfo.getSkewedColNames().size();
+    }
+    return listBucketingDepth;
+  }
+
+  /**
+   * @param table
+   * @return array of FileStatus objects corresponding to the files making up the passed
+   * unpartitioned table
+   */
+  public FileStatus[] getFileStatusesForUnpartitionedTable(Database db, Table table)
+      throws MetaException {
+    Path tablePath = getTablePath(db, table.getTableName());
+    try {
+      FileSystem fileSys = tablePath.getFileSystem(conf);
+      return HiveStatsUtils.getFileStatusRecurse(tablePath, 1, fileSys);
+    } catch (IOException ioe) {
+      MetaStoreUtils.logAndThrowMetaException(ioe);
+    }
+    return null;
+  }
+
+  /**
    * Makes a valid partition name.
    * @param partCols The partition columns
    * @param vals The partition values

Modified: hive/branches/tez/odbc/Makefile
URL: http://svn.apache.org/viewvc/hive/branches/tez/odbc/Makefile?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/odbc/Makefile (original)
+++ hive/branches/tez/odbc/Makefile Tue Nov  5 07:01:32 2013
@@ -53,7 +53,7 @@ SHELL = /bin/sh
 LIBTOOL = $(SHELL) /usr/bin/libtool
 LINK = ln -sf
 
-BUILD_DIR = $(HIVE_ROOT)/build
+BUILD_DIR = $(BASE_DIR)/target
 ODBC_BUILD_DIR = $(BUILD_DIR)/odbc
 OBJ_SERVICE_BUILD_DIR = $(BUILD_DIR)/service/objs
 OBJ_QL_BUILD_DIR = $(BUILD_DIR)/ql/objs
@@ -73,7 +73,7 @@ SHLIB_VERSION = 1.0.0
 SO_LINK_NAME = lib$(LIB_NAME).so
 SO_NAME = $(SO_LINK_NAME).$(SHLIB_VERSION)
 SO_LINK_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_LINK_NAME)
-SO_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_NAME) 
+SO_TARGET = $(LIB_ODBC_BUILD_DIR)/$(SO_NAME)
 SO_INSTALL_LINK_TARGET = $(INSTALL_LIB_PATH)/$(SO_LINK_NAME)
 SO_INSTALL_TARGET = $(INSTALL_LIB_PATH)/$(SO_NAME)
 AR_NAME = lib$(LIB_NAME).a

Copied: hive/branches/tez/pom.xml (from r1538724, hive/trunk/pom.xml)
URL: http://svn.apache.org/viewvc/hive/branches/tez/pom.xml?p2=hive/branches/tez/pom.xml&p1=hive/trunk/pom.xml&r1=1538724&r2=1538880&rev=1538880&view=diff
==============================================================================
--- hive/trunk/pom.xml (original)
+++ hive/branches/tez/pom.xml Tue Nov  5 07:01:32 2013
@@ -76,6 +76,7 @@
     <commons-httpclient.version>3.0.1</commons-httpclient.version>
     <commons-io.version>2.4</commons-io.version>
     <commons-lang.version>2.4</commons-lang.version>
+    <commons-lang3.version>3.1</commons-lang3.version>
     <commons-logging.version>1.0.4</commons-logging.version>
     <commons-pool.version>1.5.4</commons-pool.version>
     <derby.version>10.4.2.0</derby.version>
@@ -83,7 +84,7 @@
     <groovy.version>2.1.6</groovy.version>
     <hadoop-20.version>0.20.2</hadoop-20.version>
     <hadoop-20S.version>1.1.2</hadoop-20S.version>
-    <hadoop-23.version>2.0.5-alpha</hadoop-23.version>
+    <hadoop-23.version>2.2.0</hadoop-23.version>
     <hbase.version>0.94.6.1</hbase.version>
     <jackson.version>1.9.2</jackson.version>
     <javaewah.version>0.3.2</javaewah.version>
@@ -109,6 +110,7 @@
     <rat.version>0.8</rat.version>
     <slf4j.version>1.6.1</slf4j.version>
     <ST4.version>4.0.4</ST4.version>
+    <tez.version>0.2.0-SNAPSHOT</tez.version>
     <tempus-fugit.version>1.1</tempus-fugit.version>
     <snappy.version>0.2</snappy.version>
     <wadl-resourcedoc-doclet.version>1.4</wadl-resourcedoc-doclet.version>

Copied: hive/branches/tez/ql/pom.xml (from r1538724, hive/trunk/ql/pom.xml)
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/pom.xml?p2=hive/branches/tez/ql/pom.xml&p1=hive/trunk/ql/pom.xml&r1=1538724&r2=1538880&rev=1538880&view=diff
==============================================================================
--- hive/trunk/ql/pom.xml (original)
+++ hive/branches/tez/ql/pom.xml Tue Nov  5 07:01:32 2013
@@ -70,6 +70,11 @@
       <version>${commons-io.version}</version>
     </dependency>
     <dependency>
+       <groupId>org.apache.commons</groupId>
+       <artifactId>commons-lang3</artifactId>
+       <version>${commons-lang3.version}</version>
+  </dependency>
+    <dependency>
       <groupId>commons-logging</groupId>
       <artifactId>commons-logging</artifactId>
       <version>${commons-logging.version}</version>
@@ -218,6 +223,59 @@
           <version>${hadoop-23.version}</version>
           <optional>true</optional>
         </dependency>
+        <dependency> 
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-api</artifactId>
+          <version>${tez.version}</version>
+        </dependency>
+        <dependency> 
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-dag</artifactId>
+          <version>${tez.version}</version>
+        </dependency>
+        <dependency >
+           <groupId>org.apache.tez</groupId>
+           <artifactId>tez-common</artifactId>
+           <version>${tez.version}</version>
+        </dependency>
+        <dependency> 
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-runtime-library</artifactId>
+          <version>${tez.version}</version>
+        </dependency>
+        <dependency> 
+          <groupId>org.apache.tez</groupId>
+          <artifactId>tez-mapreduce</artifactId>
+          <version>${tez.version}</version>
+        </dependency>
+        <dependency>
+           <groupId>org.apache.tez</groupId>
+           <artifactId>tez-runtime-internals</artifactId>
+           <version>${tez.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-hdfs</artifactId>
+          <version>${hadoop-23.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-api</artifactId>
+          <version>${hadoop-23.version}</version>
+          <optional>true</optional>
+       </dependency>
+       <dependency>
+         <groupId>org.apache.hadoop</groupId>
+         <artifactId>hadoop-yarn-common</artifactId>
+         <version>${hadoop-23.version}</version>
+         <optional>true</optional>
+       </dependency>
+       <dependency>
+         <groupId>org.apache.hadoop</groupId>
+         <artifactId>hadoop-yarn-client</artifactId>
+         <version>${hadoop-23.version}</version>
+         <optional>true</optional>
+       </dependency>
       </dependencies>
     </profile>
     <profile>

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Nov  5 07:01:32 2013
@@ -498,6 +498,13 @@ public class FetchOperator implements Se
    * Currently only used by FetchTask.
    **/
   public boolean pushRow() throws IOException, HiveException {
+    if(work.getRowsComputedUsingStats() != null) {
+      for (List<Object> row : work.getRowsComputedUsingStats()) {
+        operator.process(row, 0);
+      }
+      operator.flush();
+      return true;
+    }
     InspectableObject row = getNextRow();
     if (row != null) {
       pushRow(row);
@@ -609,6 +616,9 @@ public class FetchOperator implements Se
    * returns output ObjectInspector, never null
    */
   public ObjectInspector getOutputObjectInspector() throws HiveException {
+    if(null != work.getStatRowOI()) {
+      return work.getStatRowOI();
+    }
     try {
       if (work.isNotPartitioned()) {
         return getRowInspectorFromTable(work.getTblDesc());

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Nov  5 07:01:32 2013
@@ -34,6 +34,8 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
@@ -51,7 +53,6 @@ import org.apache.hadoop.hive.ql.plan.Pl
 import org.apache.hadoop.hive.ql.plan.SkewedColumnPositionPair;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.Serializer;
@@ -836,7 +837,7 @@ public class FileSinkOperator extends Te
       if (conf.isLinkedFileSink()) {
         level++;
       }
-      FileStatus[] status = Utilities.getFileStatusRecurse(tmpPath, level, fs);
+      FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(tmpPath, level, fs);
       sb.append("Sample of ")
         .append(Math.min(status.length, 100))
         .append(" partitions created under ")

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java Tue Nov  5 07:01:32 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
@@ -194,11 +195,11 @@ public class MoveTask extends Task<MoveW
     }
   }
 
-
   @Override
   public int execute(DriverContext driverContext) {
 
     try {
+
       // Do any hive related operations like moving tables and files
       // to appropriate locations
       LoadFileDesc lfd = work.getLoadFileWork();
@@ -460,7 +461,7 @@ public class MoveTask extends Task<MoveW
     boolean updateBucketCols = false;
     if (bucketCols != null) {
       FileSystem fileSys = partn.getPartitionPath().getFileSystem(conf);
-      FileStatus[] fileStatus = Utilities.getFileStatusRecurse(
+      FileStatus[] fileStatus = HiveStatsUtils.getFileStatusRecurse(
           partn.getPartitionPath(), 1, fileSys);
       // Verify the number of buckets equals the number of files
       // This will not hold for dynamic partitions where not every reducer produced a file for

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Nov  5 07:01:32 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
@@ -90,9 +91,8 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
-  // picks topN K:V pairs from input. can be null
-  private transient TopNHash reducerHash;
-
+  // picks topN K:V pairs from input.
+  protected transient TopNHash reducerHash = new TopNHash();
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
@@ -136,7 +136,11 @@ public class ReduceSinkOperator extends 
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
 
-      reducerHash = createTopKHash();
+      int limit = conf.getTopN();
+      float memUsage = conf.getTopNMemoryUsage();
+      if (limit >= 0 && memUsage > 0) {
+        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
+      }
 
       firstRow = true;
       initializeChildren(hconf);
@@ -146,26 +150,8 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  private TopNHash createTopKHash() {
-    int limit = conf.getTopN();
-    float percent = conf.getTopNMemoryUsage();
-    if (limit < 0 || percent <= 0) {
-      return null;
-    }
-    if (limit == 0) {
-      return TopNHash.create0();
-    }
-    // limit * 64 : compensation of arrays for key/value/hashcodes
-    long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
-    if (threshold < 0) {
-      return null;
-    }
-    return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
-  }
-
   transient InspectableObject tempInspectableObject = new InspectableObject();
   protected transient HiveKey keyWritable = new HiveKey();
-  protected transient Writable value;
 
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
@@ -214,6 +200,7 @@ public class ReduceSinkOperator extends 
 
     if (outputColNames.size() > length) {
       // union keys
+      assert distinctColIndices != null;
       List<ObjectInspector> uois = new ArrayList<ObjectInspector>();
       for (List<Integer> distinctCols : distinctColIndices) {
         List<String> names = new ArrayList<String>();
@@ -240,6 +227,9 @@ public class ReduceSinkOperator extends 
       ObjectInspector rowInspector = inputObjInspectors[tag];
       if (firstRow) {
         firstRow = false;
+        // TODO: this is fishy - we init object inspectors based on first tag. We
+        //       should either init for each tag, or if rowInspector doesn't really
+        //       matter, then we can create this in ctor and get rid of firstRow.
         keyObjectInspector = initEvaluatorsAndReturnStruct(keyEval,
             distinctColIndices,
             conf.getOutputKeyColumnNames(), numDistributionKeys, rowInspector);
@@ -253,32 +243,6 @@ public class ReduceSinkOperator extends 
         cachedValues = new Object[valueEval.length];
       }
 
-      // Evaluate the HashCode
-      int keyHashCode = 0;
-      if (partitionEval.length == 0) {
-        // If no partition cols, just distribute the data uniformly to provide
-        // better
-        // load balance. If the requirement is to have a single reducer, we
-        // should set
-        // the number of reducers to 1.
-        // Use a constant seed to make the code deterministic.
-        if (random == null) {
-          random = new Random(12345);
-        }
-        keyHashCode = random.nextInt();
-      } else {
-        for (int i = 0; i < partitionEval.length; i++) {
-          Object o = partitionEval[i].evaluate(row);
-          keyHashCode = keyHashCode * 31
-              + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
-        }
-      }
-
-      // Evaluate the value
-      for (int i = 0; i < valueEval.length; i++) {
-        cachedValues[i] = valueEval[i].evaluate(row);
-      }
-
       // Evaluate the keys
       for (int i = 0; i < numDistributionKeys; i++) {
         cachedKeys[0][i] = keyEval[i].evaluate(row);
@@ -303,64 +267,21 @@ public class ReduceSinkOperator extends 
         }
       }
 
-      BytesWritable value = null;
-      // Serialize the keys and append the tag
       for (int i = 0; i < cachedKeys.length; i++) {
-        if (keyIsText) {
-          Text key = (Text) keySerializer.serialize(cachedKeys[i],
-              keyObjectInspector);
-          if (tag == -1) {
-            keyWritable.set(key.getBytes(), 0, key.getLength());
-          } else {
-            int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
-            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
-          }
-        } else {
-          // Must be BytesWritable
-          BytesWritable key = (BytesWritable) keySerializer.serialize(
-              cachedKeys[i], keyObjectInspector);
-          if (tag == -1) {
-            keyWritable.set(key.getBytes(), 0, key.getLength());
-          } else {
-            int keyLength = key.getLength();
-            keyWritable.setSize(keyLength + 1);
-            System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
-            keyWritable.get()[keyLength] = tagByte[0];
-          }
-        }
-        keyWritable.setHashCode(keyHashCode);
-
-        if (reducerHash == null) {
-          if (null != out) {
-            collect(keyWritable, value = getValue(row, value));
-          }
-       } else {
-          int index = reducerHash.indexOf(keyWritable);
-          if (index == TopNHash.EXCLUDED) {
-            continue;
-          }
-          value = getValue(row, value);
-          if (index >= 0) {
-            reducerHash.set(index, value);
-          } else {
-            if (index == TopNHash.FORWARD) {
-              collect(keyWritable, value);
-            } else if (index == TopNHash.FLUSH) {
-              LOG.info("Top-N hash is flushed");
-              reducerHash.flush();
-              // we can now retry adding key/value into hash, which is flushed.
-              // but for simplicity, just forward them
-              collect(keyWritable, value);
-            } else if (index == TopNHash.DISABLE) {
-              LOG.info("Top-N hash is disabled");
-              reducerHash.flush();
-              collect(keyWritable, value);
-              reducerHash = null;
-            }
-          }
+        // Serialize the keys and append the tag
+        Object keyObj = keySerializer.serialize(cachedKeys[i], keyObjectInspector);
+        setKeyWritable(keyIsText ? (Text)keyObj : (BytesWritable)keyObj, tag);
+        int topNIndex = reducerHash.tryStoreKey(keyWritable);
+        if (TopNHash.EXCLUDED == topNIndex) continue;
+        int keyHashCode = computeHashCode(row);
+        BytesWritable valueWritable = getValue(row);
+        if (TopNHash.FORWARD == topNIndex) {
+          keyWritable.setHashCode(keyHashCode);
+          collect(keyWritable, valueWritable);
+          continue;
         }
+        assert topNIndex >= 0;
+        reducerHash.storeValue(topNIndex, valueWritable, keyHashCode, false);
       }
     } catch (HiveException e) {
       throw e;
@@ -369,10 +290,58 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  public void collect(BytesWritable key, BytesWritable value) throws IOException {
+  private int computeHashCode(Object row) throws HiveException {
+    // Evaluate the HashCode
+    int keyHashCode = 0;
+    if (partitionEval.length == 0) {
+      // If no partition cols, just distribute the data uniformly to provide
+      // better
+      // load balance. If the requirement is to have a single reducer, we
+      // should set
+      // the number of reducers to 1.
+      // Use a constant seed to make the code deterministic.
+      if (random == null) {
+        random = new Random(12345);
+      }
+      keyHashCode = random.nextInt();
+    } else {
+      for (int i = 0; i < partitionEval.length; i++) {
+        Object o = partitionEval[i].evaluate(row);
+        keyHashCode = keyHashCode * 31
+            + ObjectInspectorUtils.hashCode(o, partitionObjectInspectors[i]);
+      }
+    }
+    return keyHashCode;
+  }
+
+  protected void setKeyWritable(BinaryComparable key, int tag) {
+    if (tag == -1) {
+      keyWritable.set(key.getBytes(), 0, key.getLength());
+    } else {
+      int keyLength = key.getLength();
+      keyWritable.setSize(keyLength + 1);
+      System.arraycopy(key.getBytes(), 0, keyWritable.get(), 0, keyLength);
+      keyWritable.get()[keyLength] = tagByte[0];
+    }
+  }
+
+  public void collect(byte[] key, byte[] value, int hash) throws IOException {
+    HiveKey keyWritable = new HiveKey(key, hash);
+    BytesWritable valueWritable = new BytesWritable(value);
+    collect(keyWritable, valueWritable);
+  }
+
+  protected void collect(byte[] key, Writable valueWritable, int hash) throws IOException {
+    HiveKey keyWritable = new HiveKey(key, hash);
+    collect(keyWritable, valueWritable);
+  }
+
+  protected void collect(BytesWritable keyWritable, Writable valueWritable) throws IOException {
     // Since this is a terminal operator, update counters explicitly -
     // forward is not called
-    out.collect(key, value);
+    if (null != out) {
+      out.collect(keyWritable, valueWritable);
+    }
     if (++outputRows % 1000 == 0) {
       if (counterNameToEnum != null) {
         incrCounter(numOutputRowsCntr, outputRows);
@@ -382,11 +351,7 @@ public class ReduceSinkOperator extends 
     }
   }
 
-  // evaluate value lazily
-  private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
-    if (value != null) {
-      return value;
-    }
+  private BytesWritable getValue(Object row) throws Exception {
     // Evaluate the value
     for (int i = 0; i < valueEval.length; i++) {
       cachedValues[i] = valueEval[i].evaluate(row);
@@ -397,16 +362,9 @@ public class ReduceSinkOperator extends 
 
   @Override
   protected void closeOp(boolean abort) throws HiveException {
-    if (!abort && reducerHash != null) {
-      try {
-        reducerHash.flush();
-      } catch (IOException e) {
-        throw new HiveException(e);
-      } finally {
-        reducerHash = null;
-      }
+    if (!abort) {
+      reducerHash.flush();
     }
-    reducerHash = null;
     super.closeOp(abort);
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/StatsTask.java Tue Nov  5 07:01:32 2013
@@ -19,23 +19,19 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.Serializable;
-import java.net.URI;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.regex.Pattern;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.ql.DriverContext;
@@ -43,7 +39,6 @@ import org.apache.hadoop.hive.ql.ErrorMs
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
-import org.apache.hadoop.hive.ql.optimizer.listbucketingpruner.ListBucketingPrunerUtils;
 import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer.tableSpec;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.LoadTableDesc;
@@ -52,12 +47,16 @@ import org.apache.hadoop.hive.ql.plan.ap
 import org.apache.hadoop.hive.ql.stats.StatsAggregator;
 import org.apache.hadoop.hive.ql.stats.StatsFactory;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.util.StringUtils;
 
 /**
- * StatsTask implementation.
+ * StatsTask implementation. StatsTask mainly deals with "collectable" stats. These are
+ * stats that require data scanning and are collected during query execution (unless the user
+ * explicitly requests data scanning just for the purpose of stats computation using the "ANALYZE"
+ * command. All other stats are computed directly by the MetaStore. The rationale being that the
+ * MetaStore layer covers all Thrift calls and provides better guarantees about the accuracy of
+ * those stats.
  **/
 public class StatsTask extends Task<StatsWork> implements Serializable {
 
@@ -67,25 +66,8 @@ public class StatsTask extends Task<Stat
   private Table table;
   private List<LinkedHashMap<String, String>> dpPartSpecs;
 
-  private static final List<String> supportedStats = new ArrayList<String>();
-  private static final List<String> collectableStats = new ArrayList<String>();
-  private static final Map<String, String> nameMapping = new HashMap<String, String>();
-  static {
-    // supported statistics
-    supportedStats.add(StatsSetupConst.NUM_FILES);
-    supportedStats.add(StatsSetupConst.ROW_COUNT);
-    supportedStats.add(StatsSetupConst.TOTAL_SIZE);
-    supportedStats.add(StatsSetupConst.RAW_DATA_SIZE);
-
-    // statistics that need to be collected throughout the execution
-    collectableStats.add(StatsSetupConst.ROW_COUNT);
-    collectableStats.add(StatsSetupConst.RAW_DATA_SIZE);
-
-    nameMapping.put(StatsSetupConst.NUM_FILES, "num_files");
-    nameMapping.put(StatsSetupConst.ROW_COUNT, "num_rows");
-    nameMapping.put(StatsSetupConst.TOTAL_SIZE, "total_size");
-    nameMapping.put(StatsSetupConst.RAW_DATA_SIZE, "raw_data_size");
-  }
+  private static final List<String> collectableStats = StatsSetupConst.getStatsToBeCollected();
+  private static final List<String> supportedStats = StatsSetupConst.getSupportedStats();
 
   public StatsTask() {
     super();
@@ -94,20 +76,20 @@ public class StatsTask extends Task<Stat
 
   /**
    *
-   * Partition Level Statistics.
+   * Statistics for a Partition or Unpartitioned Table
    *
    */
-  class PartitionStatistics {
+  class Statistics {
     Map<String, LongWritable> stats;
 
-    public PartitionStatistics() {
+    public Statistics() {
       stats = new HashMap<String, LongWritable>();
       for (String statType : supportedStats) {
         stats.put(statType, new LongWritable(0L));
       }
     }
 
-    public PartitionStatistics(Map<String, Long> st) {
+    public Statistics(Map<String, Long> st) {
       stats = new HashMap<String, LongWritable>();
       for (String statType : st.keySet()) {
         Long stValue = st.get(statType) == null ? 0L : st.get(statType);
@@ -126,86 +108,7 @@ public class StatsTask extends Task<Stat
 
     @Override
     public String toString() {
-      StringBuilder sb = new StringBuilder();
-      for (String statType : supportedStats) {
-        sb.append(nameMapping.get(statType)).append(": ").append(stats.get(statType)).append(", ");
-      }
-      sb.delete(sb.length() - 2, sb.length());
-      return sb.toString();
-    }
-  }
-
-  /**
-   * Table Level Statistics.
-   */
-  class TableStatistics extends PartitionStatistics {
-    int numPartitions; // number of partitions
-
-    public TableStatistics() {
-      super();
-      numPartitions = 0;
-    }
-
-    public void setNumPartitions(int np) {
-      numPartitions = np;
-    }
-
-    public int getNumPartitions() {
-      return numPartitions;
-    }
-
-    /**
-     * Incrementally update the table statistics according to the old and new
-     * partition level statistics.
-     *
-     * @param oldStats
-     *          The old statistics of a partition.
-     * @param newStats
-     *          The new statistics of a partition.
-     */
-    public void updateStats(PartitionStatistics oldStats, PartitionStatistics newStats) {
-      deletePartitionStats(oldStats);
-      addPartitionStats(newStats);
-    }
-
-    /**
-     * Update the table level statistics when a new partition is added.
-     *
-     * @param newStats
-     *          the new partition statistics.
-     */
-    public void addPartitionStats(PartitionStatistics newStats) {
-      for (String statType : supportedStats) {
-        LongWritable value = stats.get(statType);
-        if (value == null) {
-          stats.put(statType, new LongWritable(newStats.getStat(statType)));
-        } else {
-          value.set(value.get() + newStats.getStat(statType));
-        }
-      }
-      this.numPartitions++;
-    }
-
-    /**
-     * Update the table level statistics when an old partition is dropped.
-     *
-     * @param oldStats
-     *          the old partition statistics.
-     */
-    public void deletePartitionStats(PartitionStatistics oldStats) {
-      for (String statType : supportedStats) {
-        LongWritable value = stats.get(statType);
-        value.set(value.get() - oldStats.getStat(statType));
-      }
-      this.numPartitions--;
-    }
-
-    @Override
-    public String toString() {
-      StringBuilder sb = new StringBuilder();
-      sb.append("num_partitions: ").append(numPartitions).append(", ");
-      sb.append(super.toString());
-      return sb.toString();
+      return org.apache.commons.lang.StringUtils.join(supportedStats, ", ");
     }
   }
 
@@ -297,7 +200,7 @@ public class StatsTask extends Task<Stat
         }
       }
 
-      TableStatistics tblStats = new TableStatistics();
+      Statistics tblStats = new Statistics();
 
       org.apache.hadoop.hive.metastore.api.Table tTable = table.getTTable();
       Map<String, String> parameters = tTable.getParameters();
@@ -310,10 +213,6 @@ public class StatsTask extends Task<Stat
         }
       }
 
-      if (parameters.containsKey(StatsSetupConst.NUM_PARTITIONS)) {
-        tblStats.setNumPartitions(Integer.parseInt(parameters.get(StatsSetupConst.NUM_PARTITIONS)));
-      }
-
       List<Partition> partitions = getPartitionsList();
       boolean atomic = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_STATS_ATOMIC);
       int maxPrefixLength = HiveConf.getIntVar(conf,
@@ -324,10 +223,6 @@ public class StatsTask extends Task<Stat
         if (!tableStatsExist && atomic) {
           return 0;
         }
-        long[] summary = summary(conf, table);
-        tblStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
-        tblStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
-
         // In case of a non-partitioned table, the key for stats temporary store is "rootDir"
         if (statsAggregator != null) {
           String aggKey = Utilities.getHashedStatsPrefix(work.getAggKey(), maxPrefixLength);
@@ -344,6 +239,19 @@ public class StatsTask extends Task<Stat
             }
           }
         }
+
+        // write table stats to metastore
+        parameters = tTable.getParameters();
+        for (String statType : collectableStats) {
+          parameters.put(statType, Long.toString(tblStats.getStat(statType)));
+        }
+        tTable.setParameters(parameters);
+
+        String tableFullName = table.getDbName() + "." + table.getTableName();
+
+        db.alterTable(tableFullName, new Table(tTable));
+
+        console.printInfo("Table " + tableFullName + " stats: [" + tblStats.toString() + ']');
       } else {
         // Partitioned table:
         // Need to get the old stats of the partition
@@ -370,7 +278,7 @@ public class StatsTask extends Task<Stat
           //
           // get the new partition stats
           //
-          PartitionStatistics newPartStats = new PartitionStatistics();
+          Statistics newPartStats = new Statistics();
 
           // In that case of a partition, the key for stats temporary store is
           // "rootDir/[dynamic_partition_specs/]%"
@@ -398,16 +306,16 @@ public class StatsTask extends Task<Stat
             }
           }
 
-          long[] summary = summary(conf, partn);
-          newPartStats.setStat(StatsSetupConst.NUM_FILES, summary[0]);
-          newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, summary[1]);
-
-          if (hasStats) {
-            PartitionStatistics oldPartStats = new PartitionStatistics(currentValues);
-            tblStats.updateStats(oldPartStats, newPartStats);
-          } else {
-            tblStats.addPartitionStats(newPartStats);
+          /**
+           * calculate fast statistics
+           */
+          FileStatus[] partfileStatus = wh.getFileStatusesForPartition(tPart);
+          newPartStats.setStat(StatsSetupConst.NUM_FILES, partfileStatus.length);
+          long partSize = 0L;
+          for (int i = 0; i < partfileStatus.length; i++) {
+            partSize += partfileStatus[i].getLen();
           }
+          newPartStats.setStat(StatsSetupConst.TOTAL_SIZE, partSize);
 
           //
           // update the metastore
@@ -429,22 +337,6 @@ public class StatsTask extends Task<Stat
 
       }
 
-      //
-      // write table stats to metastore
-      //
-      parameters = tTable.getParameters();
-      for (String statType : supportedStats) {
-        parameters.put(statType, Long.toString(tblStats.getStat(statType)));
-      }
-      parameters.put(StatsSetupConst.NUM_PARTITIONS, Integer.toString(tblStats.getNumPartitions()));
-      tTable.setParameters(parameters);
-
-      String tableFullName = table.getDbName() + "." + table.getTableName();
-
-      db.alterTable(tableFullName, new Table(tTable));
-
-      console.printInfo("Table " + tableFullName + " stats: [" + tblStats.toString() + ']');
-
     } catch (Exception e) {
       console.printInfo("[Warning] could not update stats.",
           "Failed with exception " + e.getMessage() + "\n"
@@ -464,105 +356,6 @@ public class StatsTask extends Task<Stat
     return ret;
   }
 
-  private long[] summary(HiveConf conf, Partition partn) throws IOException {
-    Path path = partn.getPartitionPath();
-    FileSystem fs = path.getFileSystem(conf);
-    List<String> skewedColNames = partn.getSkewedColNames();
-    if (skewedColNames == null || skewedColNames.isEmpty()) {
-      return summary(fs, path);
-    }
-    List<List<String>> skewColValues = table.getSkewedColValues();
-    if (skewColValues == null || skewColValues.isEmpty()) {
-      return summary(fs, toDefaultLBPath(path));
-    }
-    return summary(fs, path, skewedColNames);
-  }
-
-  private long[] summary(HiveConf conf, Table table) throws IOException {
-    Path path = table.getPath();
-    FileSystem fs = path.getFileSystem(conf);
-    List<String> skewedColNames = table.getSkewedColNames();
-    if (skewedColNames == null || skewedColNames.isEmpty()) {
-      return summary(fs, path);
-    }
-    List<List<String>> skewColValues = table.getSkewedColValues();
-    if (skewColValues == null || skewColValues.isEmpty()) {
-      return summary(fs, toDefaultLBPath(path));
-    }
-    return summary(fs, path, table.getSkewedColNames());
-  }
-
-  private Path toDefaultLBPath(Path path) {
-    return new Path(path, ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME);
-  }
-
-  private long[] summary(FileSystem fs, Path path) throws IOException {
-    try {
-      FileStatus status = fs.getFileStatus(path);
-      if (!status.isDir()) {
-        return new long[] {1, status.getLen()};
-      }
-    } catch (FileNotFoundException e) {
-      return new long[] {0, 0};
-    }
-    FileStatus[] children = fs.listStatus(path);  // can be null
-    if (children == null) {
-      return new long[] {0, 0};
-    }
-    long numFiles = 0L;
-    long tableSize = 0L;
-    for (FileStatus child : children) {
-      if (!child.isDir()) {
-        tableSize += child.getLen();
-        numFiles++;
-      }
-    }
-    return new long[] {numFiles, tableSize};
-  }
-
-  private Pattern toPattern(List<String> skewCols) {
-    StringBuilder builder = new StringBuilder();
-    for (String skewCol : skewCols) {
-      if (builder.length() > 0) {
-        builder.append(Path.SEPARATOR_CHAR);
-      }
-      builder.append(skewCol).append('=');
-      builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*");
-    }
-    builder.append(Path.SEPARATOR_CHAR);
-    builder.append("[^").append(Path.SEPARATOR_CHAR).append("]*$");
-    return Pattern.compile(builder.toString());
-  }
-
-  private long[] summary(FileSystem fs, Path path, List<String> skewCols) throws IOException {
-    long numFiles = 0L;
-    long tableSize = 0L;
-    Pattern pattern = toPattern(skewCols);
-    for (FileStatus status : Utilities.getFileStatusRecurse(path, skewCols.size() + 1, fs)) {
-      if (status.isDir()) {
-        continue;
-      }
-      String relative = toRelativePath(path, status.getPath());
-      if (relative == null) {
-        continue;
-      }
-      if (relative.startsWith(ListBucketingPrunerUtils.HIVE_LIST_BUCKETING_DEFAULT_DIR_NAME) ||
-        pattern.matcher(relative).matches()) {
-        tableSize += status.getLen();
-        numFiles++;
-      }
-    }
-    return new long[] {numFiles, tableSize};
-  }
-
-  private String toRelativePath(Path path1, Path path2) {
-    URI relative = path1.toUri().relativize(path2.toUri());
-    if (relative == path2.toUri()) {
-      return null;
-    }
-    return relative.getPath();
-  }
-
   private boolean existStats(Map<String, String> parameters) {
     return parameters.containsKey(StatsSetupConst.ROW_COUNT)
         || parameters.containsKey(StatsSetupConst.NUM_FILES)
@@ -571,7 +364,7 @@ public class StatsTask extends Task<Stat
         || parameters.containsKey(StatsSetupConst.NUM_PARTITIONS);
   }
 
-  private void updateStats(List<String> statsList, PartitionStatistics stats,
+  private void updateStats(List<String> statsList, Statistics stats,
       StatsAggregator statsAggregator, Map<String, String> parameters,
       String aggKey, boolean atomic) throws HiveException {
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TableScanOperator.java Tue Nov  5 07:01:32 2013
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.metadata.VirtualColumn;
@@ -37,7 +38,6 @@ import org.apache.hadoop.hive.ql.plan.Ta
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.stats.StatsPublisher;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/TopNHash.java Tue Nov  5 07:01:32 2013
@@ -20,49 +20,65 @@ package org.apache.hadoop.hive.ql.exec;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
 import java.util.SortedSet;
+import java.util.TreeMap;
 import java.util.TreeSet;
 
 import com.google.common.collect.MinMaxPriorityQueue;
-import org.apache.hadoop.hive.ql.io.HiveKey;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.io.BinaryComparable;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Stores binary key/value in sorted manner to get top-n key/value
+ * TODO: rename to TopNHeap?
  */
-abstract class TopNHash {
+public class TopNHash {
+  public static Log LOG = LogFactory.getLog(TopNHash.class);
 
   /**
    * For interaction between operator and top-n hash.
    * Currently only used to forward key/values stored in hash.
    */
-  public static interface BinaryCollector extends OutputCollector<BytesWritable, BytesWritable> {
+  public static interface BinaryCollector {
+    public void collect(byte[] key, byte[] value, int hash) throws IOException;
   }
 
-  protected static final int FORWARD = -1;
-  protected static final int EXCLUDED = -2;
-  protected static final int FLUSH = -3;
-  protected static final int DISABLE = -4;
+  public static final int FORWARD = -1;
+  public static final int EXCLUDED = -2;
+  private static final int FLUSH = -3;
+  private static final int DISABLE = -4;
+  private static final int MAY_FORWARD = -5;
+
+  private BinaryCollector collector;
+  private int topN;
 
-  protected final int topN;
-  protected final BinaryCollector collector;
+  private long threshold;   // max heap size
+  private long usage;
 
-  protected final long threshold;   // max heap size
-  protected long usage;             // heap usage (not exact)
+  // binary keys, values and hashCodes of rows, lined up by index
+  private byte[][] keys;
+  private byte[][] values;
+  private int[] hashes;
+  private IndexStore indexes; // The heap over the keys, storing indexes in the array.
 
-  // binary keys, binary values and hashcodes of keys, lined up by index
-  protected final byte[][] keys;
-  protected final byte[][] values;
-  protected final int[] hashes;
+  private int evicted; // recently evicted index (used for next key/value)
+  private int excluded; // count of excluded rows from previous flush
 
-  protected int evicted;    // recetly evicted index (the biggest one. used for next key/value)
-  protected int excluded;   // count of excluded rows from previous flush
+  // temporary stuff used for vectorization
+  private int batchNumForwards = 0; // whether current batch has any forwarded keys
+  private int[] indexToBatchIndex; // mapping of index (lined up w/keys) to index in the batch
 
-  protected final Comparator<Integer> C = new Comparator<Integer>() {
+  private boolean isEnabled = false;
+
+  private final Comparator<Integer> C = new Comparator<Integer>() {
     public int compare(Integer o1, Integer o2) {
       byte[] key1 = keys[o1];
       byte[] key2 = keys[o2];
@@ -70,29 +86,201 @@ abstract class TopNHash {
     }
   };
 
-  public static TopNHash create0() {
-    return new HashForLimit0();
-  }
-
-  public static TopNHash create(boolean grouped, int topN, long threshold,
-      BinaryCollector collector) {
+  public void initialize(
+      int topN, float memUsage, boolean isMapGroupBy, BinaryCollector collector) {
+    assert topN >= 0 && memUsage > 0;
+    assert !this.isEnabled;
+    this.isEnabled = false;
+    this.topN = topN;
+    this.collector = collector;
     if (topN == 0) {
-      return new HashForLimit0();
+      isEnabled = true;
+      return; // topN == 0 will cause a short-circuit, don't need any initialization
     }
-    if (grouped) {
-      return new HashForGroup(topN, threshold, collector);
-    }
-    return new HashForRow(topN, threshold, collector);
-  }
 
-  TopNHash(int topN, long threshold, BinaryCollector collector) {
-    this.topN = topN;
-    this.threshold = threshold;
-    this.collector = collector;
+    // limit * 64 : compensation of arrays for key/value/hashcodes
+    this.threshold = (long) (memUsage * Runtime.getRuntime().maxMemory()) - topN * 64;
+    if (threshold < 0) {
+      return;
+    }
+    this.indexes = isMapGroupBy ? new HashForGroup() : new HashForRow();
     this.keys = new byte[topN + 1][];
     this.values = new byte[topN + 1][];
     this.hashes = new int[topN + 1];
     this.evicted = topN;
+    this.isEnabled = true;
+  }
+
+  /**
+   * Try store the non-vectorized key.
+   * @param key Serialized key.
+   * @return TopNHash.FORWARD if the row should be forwarded;
+   *         TopNHash.EXCLUDED if the row should be discarded;
+   *         any other number if the row is to be stored; the index should be passed to storeValue.
+   */
+  public int tryStoreKey(BytesWritable key) throws HiveException, IOException {
+    if (!isEnabled) {
+      return FORWARD; // short-circuit quickly - forward all rows
+    }
+    if (topN == 0) {
+      return EXCLUDED; // short-circuit quickly - eat all rows
+    }
+    int index = insertKeyIntoHeap(key);
+    if (index >= 0) {
+      usage += key.getLength();
+      return index;
+    }
+    // IndexStore is trying to tell us something.
+    switch (index) {
+      case DISABLE: {
+        LOG.info("Top-N hash is disabled");
+        flushInternal();
+        isEnabled = false;
+        return FORWARD;
+      }
+      case FLUSH: {
+        LOG.info("Top-N hash is flushed");
+        flushInternal();
+        // we can now retry adding key/value into hash, which is flushed.
+        // but for simplicity, just forward them
+        return FORWARD;
+      }
+      case FORWARD:  return FORWARD;
+      case EXCLUDED: return EXCLUDED; // skip the row.
+      default: {
+        assert false;
+        throw new HiveException("Invalid result trying to store the key: " + index);
+      }
+    }
+  }
+
+
+  /**
+   * Perform basic checks and initialize TopNHash for the new vectorized row batch.
+   * @return TopNHash.FORWARD if all rows should be forwarded w/o trying to call TopN;
+   *         TopNHash.EXCLUDED if all rows should be discarded w/o trying to call TopN;
+   *         any other result means the batch has been started.
+   */
+  public int startVectorizedBatch() throws IOException, HiveException {
+    if (!isEnabled) {
+      return FORWARD; // short-circuit quickly - forward all rows
+    } else if (topN == 0) {
+      return EXCLUDED; // short-circuit quickly - eat all rows
+    }
+    // Flush here if the memory usage is too high. After that, we have the entire
+    // batch already in memory anyway so we will bypass the memory checks.
+    if (usage > threshold) {
+      int excluded = this.excluded;
+      LOG.info("Top-N hash is flushing rows");
+      flushInternal();
+      if (excluded == 0) {
+        LOG.info("Top-N hash has been disabled");
+        isEnabled = false;
+        return FORWARD; // Hash is ineffective, disable.
+      }
+    }
+    if (indexToBatchIndex == null) {
+      indexToBatchIndex = new int[topN + 1]; // for current batch, contains key index in the batch
+    }
+    Arrays.fill(indexToBatchIndex, -1);
+    batchNumForwards = 0;
+    return 0;
+  }
+
+  /**
+   * Try to put the key from the current vectorized batch into the heap.
+   * @param key the key.
+   * @param batchIndex The index of the key in the vectorized batch (sequential, not .selected).
+   * @param results The results; the number of elements equivalent to vrg.size, by kindex.
+   *   The result should be the same across the calls for the batch; in then end, for each k-index:
+   *     - TopNHash.EXCLUDED - discard the row.
+   *     - positive index - store the row using storeValue, same as tryStoreRow.
+   *     - negative index - forward the row. getVectorizedKeyToForward called w/this index will
+   *        return the key to use so it doesn't have to be rebuilt.
+   */
+  public void tryStoreVectorizedKey(BytesWritable key, int batchIndex, int[] results)
+          throws HiveException, IOException {
+    // Assumption - batchIndex is increasing; startVectorizedBatch was called
+    int size = indexes.size();
+    int index = size < topN ? size : evicted;
+    keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
+    Integer collisionIndex = indexes.store(index);
+    if (null != collisionIndex) {
+      // forward conditional on the survival of the corresponding key currently in indexes.
+      ++batchNumForwards;
+      results[batchIndex] = MAY_FORWARD - collisionIndex;
+      return;
+    }
+    indexToBatchIndex[index] = batchIndex;
+    results[batchIndex] = index;
+    if (size != topN) return;
+    evicted = indexes.removeBiggest();  // remove the biggest key
+    if (index == evicted) {
+      excluded++;
+      results[batchIndex] = EXCLUDED;
+      indexToBatchIndex[index] = -1;
+      return; // input key is bigger than any of keys in hash
+    }
+    removed(evicted);
+    int evictedBatchIndex = indexToBatchIndex[evicted];
+    if (evictedBatchIndex >= 0) {
+      // reset the result for the evicted index
+      results[evictedBatchIndex] = EXCLUDED;
+      indexToBatchIndex[evicted] = -1;
+    }
+    // Also evict all results grouped with this index; cannot be current key or before it.
+    if (batchNumForwards > 0) {
+      int evictedForward = (MAY_FORWARD - evicted);
+      boolean forwardRemoved = false;
+      for (int i = evictedBatchIndex + 1; i < batchIndex; ++i) {
+        if (results[i] == evictedForward) {
+          results[i] = EXCLUDED;
+          forwardRemoved = true;
+        }
+      }
+      if (forwardRemoved) {
+        --batchNumForwards;
+      }
+    }
+  }
+
+  /**
+   * After vectorized batch is processed, can return the key that caused a particular row
+   * to be forwarded. Because the row could only be marked to forward because it has
+   * the same key with some row already in the heap (for GBY), we can use that key from the
+   * heap to emit the forwarded row.
+   * @param index Negative index from the vectorized result. See tryStoreVectorizedKey.
+   * @return The key corresponding to the row.
+   */
+  public byte[] getVectorizedKeyToForward(int index) {
+    assert index <= MAY_FORWARD;
+    return keys[MAY_FORWARD - index];
+  }
+
+  /**
+   * Stores the value for the key in the heap.
+   * @param index The index, either from tryStoreKey or from tryStoreVectorizedKey result.
+   * @param value The value to store.
+   * @param keyHash The key hash to store.
+   * @param vectorized Whether the result is coming from a vectorized batch.
+   */
+  public void storeValue(int index, BytesWritable value, int keyHash, boolean vectorized) {
+    values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
+    hashes[index] = keyHash;
+    // Vectorized doesn't adjust usage for the keys while processing the batch
+    usage += values[index].length + (vectorized ? keys[index].length : 0);
+  }
+
+  /**
+   * Flushes all the rows cached in the heap.
+   */
+  public void flush() throws HiveException {
+    if (!isEnabled || (topN == 0)) return;
+    try {
+      flushInternal();
+    } catch (IOException ex) {
+      throw new HiveException(ex);
+    }
   }
 
   /**
@@ -104,15 +292,14 @@ abstract class TopNHash {
    * -3 for FLUSH     : memory is not enough. flush values (keep keys only)
    * -4 for DISABLE   : hash is not effective. flush and disable it
    */
-  public int indexOf(HiveKey key) {
-    int size = size();
+  private int insertKeyIntoHeap(BinaryComparable key) {
     if (usage > threshold) {
       return excluded == 0 ? DISABLE : FLUSH;
     }
+    int size = indexes.size();
     int index = size < topN ? size : evicted;
     keys[index] = Arrays.copyOf(key.getBytes(), key.getLength());
-    hashes[index] = key.hashCode();
-    if (!store(index)) {
+    if (null != indexes.store(index)) {
       // it's only for GBY which should forward all values associated with the key in the range
       // of limit. new value should be attatched with the key but in current implementation,
       // only one values is allowed. with map-aggreagtion which is true by default,
@@ -120,7 +307,7 @@ abstract class TopNHash {
       return FORWARD;
     }
     if (size == topN) {
-      evicted = removeBiggest();  // remove the biggest key
+      evicted = indexes.removeBiggest();  // remove the biggest key
       if (index == evicted) {
         excluded++;
         return EXCLUDED;          // input key is bigger than any of keys in hash
@@ -130,130 +317,93 @@ abstract class TopNHash {
     return index;
   }
 
-  protected abstract int size();
-
-  protected abstract boolean store(int index);
-
-  protected abstract int removeBiggest();
-
-  protected abstract Iterable<Integer> indexes();
-
   // key/value of the index is removed. retrieve memory usage
-  public void removed(int index) {
+  private void removed(int index) {
     usage -= keys[index].length;
     keys[index] = null;
     if (values[index] != null) {
-      // value can be null if hash is flushed, which only keeps keys for limiting rows
       usage -= values[index].length;
       values[index] = null;
     }
     hashes[index] = -1;
   }
 
-  public void set(int index, BytesWritable value) {
-    values[index] = Arrays.copyOf(value.getBytes(), value.getLength());
-    usage += keys[index].length + values[index].length;
-  }
-
-  public void flush() throws IOException {
-    for (int index : indexes()) {
-      flush(index);
+  private void flushInternal() throws IOException, HiveException {
+    for (int index : indexes.indexes()) {
+      if (index != evicted && values[index] != null) {
+        collector.collect(keys[index], values[index], hashes[index]);
+        usage -= values[index].length;
+        values[index] = null;
+        hashes[index] = -1;
+      }
     }
     excluded = 0;
   }
 
-  protected void flush(int index) throws IOException {
-    if (index != evicted && values[index] != null) {
-      // BytesWritable copies array for set method. So just creats new one
-      HiveKey keyWritable = new HiveKey(keys[index], hashes[index]);
-      BytesWritable valueWritable = new BytesWritable(values[index]);
-      collector.collect(keyWritable, valueWritable);
-      usage -= values[index].length;
-      values[index] = null;
-    }
-  }
-}
-
-/**
- * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
- * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
- */
-class HashForRow extends TopNHash {
-
-  private final MinMaxPriorityQueue<Integer> indexes;
-
-  HashForRow(int topN, long threshold, BinaryCollector collector) {
-    super(topN, threshold, collector);
-    this.indexes = MinMaxPriorityQueue.orderedBy(C).create();
-  }
-
-  protected int size() {
-    return indexes.size();
-  }
-
-  // returns true always
-  protected boolean store(int index) {
-    return indexes.add(index);
+  private interface IndexStore {
+    int size();
+    /**
+     * @return the index which caused the item to be rejected; or null if accepted
+     */
+    Integer store(int index);
+    int removeBiggest();
+    Iterable<Integer> indexes();
   }
 
-  protected int removeBiggest() {
-    return indexes.removeLast();
-  }
-
-  protected Iterable<Integer> indexes() {
-    Integer[] array = indexes.toArray(new Integer[indexes.size()]);
-    Arrays.sort(array, 0, array.length, C);
-    return Arrays.asList(array);
-  }
-}
-
-/**
- * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
- * simple TreeMap is used because group by does not need keep duplicated keys
- */
-class HashForGroup extends TopNHash {
+  /**
+   * for order by, same keys are counted (For 1-2-2-3-4, limit 3 is 1-2-2)
+   * MinMaxPriorityQueue is used because it alows duplication and fast access to biggest one
+   */
+  private class HashForRow implements IndexStore {
+    private final MinMaxPriorityQueue<Integer> indexes = MinMaxPriorityQueue.orderedBy(C).create();
 
-  private final SortedSet<Integer> indexes;
+    public int size() {
+      return indexes.size();
+    }
 
-  HashForGroup(int topN, long threshold, BinaryCollector collector) {
-    super(topN, threshold, collector);
-    this.indexes = new TreeSet<Integer>(C);
-  }
+    // returns null always
+    public Integer store(int index) {
+      boolean result = indexes.add(index);
+      assert result;
+      return null;
+    }
 
-  protected int size() {
-    return indexes.size();
-  }
+    public int removeBiggest() {
+      return indexes.removeLast();
+    }
 
-  // returns false if index already exists in map
-  protected boolean store(int index) {
-    return indexes.add(index);
+    public Iterable<Integer> indexes() {
+      Integer[] array = indexes.toArray(new Integer[indexes.size()]);
+      Arrays.sort(array, 0, array.length, C);
+      return Arrays.asList(array);
+    }
   }
 
-  protected int removeBiggest() {
-    Integer last = indexes.last();
-    indexes.remove(last);
-    return last;
-  }
+  /**
+   * for group by, same keys are not counted (For 1-2-2-3-4, limit 3 is 1-2-(2)-3)
+   * simple TreeMap is used because group by does not need keep duplicated keys
+   */
+  private class HashForGroup implements IndexStore {
+    // TreeSet anyway uses TreeMap; so use plain TreeMap to be able to get value in collisions.
+    private final TreeMap<Integer, Integer> indexes = new TreeMap<Integer, Integer>(C);
 
-  protected Iterable<Integer> indexes() {
-    return indexes;
-  }
-}
+    public int size() {
+      return indexes.size();
+    }
 
-class HashForLimit0 extends TopNHash {
+    // returns false if index already exists in map
+    public Integer store(int index) {
+      return indexes.put(index, index);
+    }
 
-  HashForLimit0() {
-    super(0, 0, null);
-  }
+    public int removeBiggest() {
+      Integer last = indexes.lastKey();
+      indexes.remove(last);
+      return last;
+    }
 
-  @Override
-  public int indexOf(HiveKey key) {
-    return EXCLUDED;
+    public Iterable<Integer> indexes() {
+      return indexes.keySet();
+    }
   }
-
-  protected int size() { return 0; }
-  protected boolean store(int index) { return false; }
-  protected int removeBiggest() { return 0; }
-  protected Iterable<Integer> indexes() { return Collections.emptyList(); }
 }
-

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Nov  5 07:01:32 2013
@@ -94,6 +94,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
+import org.apache.hadoop.hive.common.HiveStatsUtils;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.Warehouse;
@@ -1641,30 +1642,6 @@ public final class Utilities {
     }
   }
 
-  /**
-   * Get all file status from a root path and recursively go deep into certain levels.
-   *
-   * @param path
-   *          the root path
-   * @param level
-   *          the depth of directory should explore
-   * @param fs
-   *          the file system
-   * @return array of FileStatus
-   * @throws IOException
-   */
-  public static FileStatus[] getFileStatusRecurse(Path path, int level, FileSystem fs)
-      throws IOException {
-
-    // construct a path pattern (e.g., /*/*) to find all dynamically generated paths
-    StringBuilder sb = new StringBuilder(path.toUri().getPath());
-    for (int i = 0; i < level; ++i) {
-      sb.append(Path.SEPARATOR).append("*");
-    }
-    Path pathPattern = new Path(path, sb.toString());
-    return fs.globStatus(pathPattern);
-  }
-
   public static void mvFileToFinalPath(String specPath, Configuration hconf,
       boolean success, Log log, DynamicPartitionCtx dpCtx, FileSinkDesc conf,
       Reporter reporter) throws IOException,
@@ -1770,7 +1747,7 @@ public final class Utilities {
 
     ArrayList<String> result = new ArrayList<String>();
     if (dpCtx != null) {
-      FileStatus parts[] = getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
+      FileStatus parts[] = HiveStatsUtils.getFileStatusRecurse(path, dpCtx.getNumDPCols(), fs);
       HashMap<String, FileStatus> taskIDToFile = null;
 
       for (int i = 0; i < parts.length; ++i) {
@@ -2291,7 +2268,7 @@ public final class Utilities {
       Path loadPath = new Path(dpCtx.getRootPath());
       FileSystem fs = loadPath.getFileSystem(conf);
       int numDPCols = dpCtx.getNumDPCols();
-      FileStatus[] status = Utilities.getFileStatusRecurse(loadPath, numDPCols, fs);
+      FileStatus[] status = HiveStatsUtils.getFileStatusRecurse(loadPath, numDPCols, fs);
 
       if (status.length == 0) {
         LOG.warn("No partition is generated by dynamic partitioning");

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Nov  5 07:01:32 2013
@@ -766,8 +766,7 @@ public class ExecDriver extends Task<Map
       if (hadoopLocalMode && (oneProp.equals(hadoopSysDir) || oneProp.equals(hadoopWorkDir))) {
         continue;
       }
-
-      tempConf.set(oneProp, deltaP.getProperty(oneProp));
+      tempConf.set(oneProp, hconf.get(oneProp));
     }
 
     // Multiple concurrent local mode job submissions can cause collisions in

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java?rev=1538880&r1=1538879&r2=1538880&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFileSinkOperator.java Tue Nov  5 07:01:32 2013
@@ -25,7 +25,7 @@ import org.apache.hadoop.hive.ql.exec.ve
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
-import org.apache.hadoop.hive.ql.stats.StatsSetupConst;
+import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;