You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/10 23:41:17 UTC

svn commit: r1624140 [2/3] - in /hive/branches/cbo: ./ common/src/java/org/apache/hadoop/hive/conf/ hbase-handler/ hbase-handler/if/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/ hbase-...

Modified: hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Wed Sep 10 21:41:16 2014
@@ -17,30 +17,38 @@
  */
 package org.apache.hadoop.hive.metastore.txn;
 
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.shims.ShimLoader;
-
 import java.sql.Connection;
 import java.sql.Driver;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.util.Properties;
 
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
 /**
- * Utility methods for creating and destroying txn database/schema.  Placed
- * here in a separate class so it can be shared across unit tests.
+ * Utility methods for creating and destroying txn database/schema.
+ * Placed here in a separate class so it can be shared across unit tests.
  */
-public class TxnDbUtil {
-  private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+public final class TxnDbUtil {
+
+  private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+  private TxnDbUtil() {
+    throw new UnsupportedOperationException("Can't initialize class");
+  }
 
   /**
    * Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
    * and the JDBC configs will be set for putting the transaction and lock info in the embedded
    * metastore.
-   * @param conf HiveConf to add these values to.
+   *
+   * @param conf HiveConf to add these values to
    */
   public static void setConfValues(HiveConf conf) {
-    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+    conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
     conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
   }
 
@@ -49,187 +57,193 @@ public class TxnDbUtil {
     // intended for creating derby databases, and thus will inexorably get
     // out of date with it.  I'm open to any suggestions on how to make this
     // read the file in a build friendly way.
+
     Connection conn = null;
-    boolean committed = false;
+    Statement stmt = null;
     try {
       conn = getConnection();
-      Statement s = conn.createStatement();
-      s.execute("CREATE TABLE TXNS (" +
-          "  TXN_ID bigint PRIMARY KEY," +
-          "  TXN_STATE char(1) NOT NULL," +
-          "  TXN_STARTED bigint NOT NULL," +
-          "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
-          "  TXN_USER varchar(128) NOT NULL," +
-          "  TXN_HOST varchar(128) NOT NULL)");
-
-      s.execute("CREATE TABLE TXN_COMPONENTS (" +
-      "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
-      "  TC_DATABASE varchar(128) NOT NULL," +
-      "  TC_TABLE varchar(128)," +
-      "  TC_PARTITION varchar(767))");
-      s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
-          "  CTC_TXNID bigint," +
-          "  CTC_DATABASE varchar(128) NOT NULL," +
-          "  CTC_TABLE varchar(128)," +
-          "  CTC_PARTITION varchar(767))");
-      s.execute("CREATE TABLE NEXT_TXN_ID (" +
-          "  NTXN_NEXT bigint NOT NULL)");
-      s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
-      s.execute("CREATE TABLE HIVE_LOCKS (" +
-          " HL_LOCK_EXT_ID bigint NOT NULL," +
-          " HL_LOCK_INT_ID bigint NOT NULL," +
-          " HL_TXNID bigint," +
-          " HL_DB varchar(128) NOT NULL," +
-          " HL_TABLE varchar(128)," +
-          " HL_PARTITION varchar(767)," +
-          " HL_LOCK_STATE char(1) NOT NULL," +
-          " HL_LOCK_TYPE char(1) NOT NULL," +
-          " HL_LAST_HEARTBEAT bigint NOT NULL," +
-          " HL_ACQUIRED_AT bigint," +
-          " HL_USER varchar(128) NOT NULL," +
-          " HL_HOST varchar(128) NOT NULL," +
-          " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
-      s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
-
-      s.execute("CREATE TABLE NEXT_LOCK_ID (" +
-          " NL_NEXT bigint NOT NULL)");
-      s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
-
-      s.execute("CREATE TABLE COMPACTION_QUEUE (" +
-          " CQ_ID bigint PRIMARY KEY," +
-          " CQ_DATABASE varchar(128) NOT NULL," +
-          " CQ_TABLE varchar(128) NOT NULL," +
-          " CQ_PARTITION varchar(767)," +
-          " CQ_STATE char(1) NOT NULL," +
-          " CQ_TYPE char(1) NOT NULL," +
-          " CQ_WORKER_ID varchar(128)," +
-          " CQ_START bigint," +
-          " CQ_RUN_AS varchar(128))");
+      stmt = conn.createStatement();
+      stmt.execute("CREATE TABLE TXNS (" +
+                   "  TXN_ID bigint PRIMARY KEY," +
+                   "  TXN_STATE char(1) NOT NULL," +
+                   "  TXN_STARTED bigint NOT NULL," +
+                   "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
+                   "  TXN_USER varchar(128) NOT NULL," +
+                   "  TXN_HOST varchar(128) NOT NULL)");
+
+      stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
+                   "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+                   "  TC_DATABASE varchar(128) NOT NULL," +
+                   "  TC_TABLE varchar(128)," +
+                   "  TC_PARTITION varchar(767))");
+      stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+                   "  CTC_TXNID bigint," +
+                   "  CTC_DATABASE varchar(128) NOT NULL," +
+                   "  CTC_TABLE varchar(128)," +
+                   "  CTC_PARTITION varchar(767))");
+      stmt.execute("CREATE TABLE NEXT_TXN_ID (" + "  NTXN_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+      stmt.execute("CREATE TABLE HIVE_LOCKS (" +
+                   " HL_LOCK_EXT_ID bigint NOT NULL," +
+                   " HL_LOCK_INT_ID bigint NOT NULL," +
+                   " HL_TXNID bigint," +
+                   " HL_DB varchar(128) NOT NULL," +
+                   " HL_TABLE varchar(128)," +
+                   " HL_PARTITION varchar(767)," +
+                   " HL_LOCK_STATE char(1) NOT NULL," +
+                   " HL_LOCK_TYPE char(1) NOT NULL," +
+                   " HL_LAST_HEARTBEAT bigint NOT NULL," +
+                   " HL_ACQUIRED_AT bigint," +
+                   " HL_USER varchar(128) NOT NULL," +
+                   " HL_HOST varchar(128) NOT NULL," +
+                   " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+      stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+
+      stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+
+      stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
+                   " CQ_ID bigint PRIMARY KEY," +
+                   " CQ_DATABASE varchar(128) NOT NULL," +
+                   " CQ_TABLE varchar(128) NOT NULL," +
+                   " CQ_PARTITION varchar(767)," +
+                   " CQ_STATE char(1) NOT NULL," +
+                   " CQ_TYPE char(1) NOT NULL," +
+                   " CQ_WORKER_ID varchar(128)," +
+                   " CQ_START bigint," +
+                   " CQ_RUN_AS varchar(128))");
 
-      s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
-      s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+      stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
+      stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
 
       conn.commit();
-      committed = true;
     } finally {
-      if (!committed) conn.rollback();
-      conn.close();
+      closeResources(conn, stmt, null);
     }
   }
 
-  public static void cleanDb() throws  Exception {
+  public static void cleanDb() throws Exception {
     Connection conn = null;
-    boolean committed = false;
+    Statement stmt = null;
     try {
       conn = getConnection();
-      Statement s = conn.createStatement();
+      stmt = conn.createStatement();
+
       // We want to try these, whether they succeed or fail.
       try {
-        s.execute("DROP INDEX HL_TXNID_INDEX");
-      } catch (Exception e) {
-        System.err.println("Unable to drop index HL_TXNID_INDEX " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE TXN_COMPONENTS");
-      } catch (Exception e) {
-        System.err.println("Unable to drop table TXN_COMPONENTS " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
-      } catch (Exception e) {
-        System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE TXNS");
-      } catch (Exception e) {
-        System.err.println("Unable to drop table TXNS " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE NEXT_TXN_ID");
-      } catch (Exception e) {
-        System.err.println("Unable to drop table NEXT_TXN_ID " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE HIVE_LOCKS");
-      } catch (Exception e) {
-        System.err.println("Unable to drop table HIVE_LOCKS " +
-            e.getMessage());
-      }
-      try {
-        s.execute("DROP TABLE NEXT_LOCK_ID");
-      } catch (Exception e) {
-      }
-      try {
-        s.execute("DROP TABLE COMPACTION_QUEUE");
-      } catch (Exception e) {
-      }
-      try {
-        s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
+        stmt.execute("DROP INDEX HL_TXNID_INDEX");
       } catch (Exception e) {
+        System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage());
       }
+
+      dropTable(stmt, "TXN_COMPONENTS");
+      dropTable(stmt, "COMPLETED_TXN_COMPONENTS");
+      dropTable(stmt, "TXNS");
+      dropTable(stmt, "NEXT_TXN_ID");
+      dropTable(stmt, "HIVE_LOCKS");
+      dropTable(stmt, "NEXT_LOCK_ID");
+      dropTable(stmt, "COMPACTION_QUEUE");
+      dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
+
       conn.commit();
-      committed = true;
     } finally {
-      if (!committed) conn.rollback();
-      conn.close();
+      closeResources(conn, stmt, null);
+    }
+  }
+
+  private static void dropTable(Statement stmt, String name) {
+    try {
+      stmt.execute("DROP TABLE " + name);
+    } catch (Exception e) {
+      System.err.println("Unable to drop table " + name + ": " + e.getMessage());
     }
   }
 
   /**
    * A tool to count the number of partitions, tables,
    * and databases locked by a particular lockId.
+   *
    * @param lockId lock id to look for lock components
+   *
    * @return number of components, or 0 if there is no lock
    */
-  public static int countLockComponents(long lockId) throws  Exception {
-    Connection conn = getConnection();
+  public static int countLockComponents(long lockId) throws Exception {
+    Connection conn = null;
+    PreparedStatement stmt = null;
+    ResultSet rs = null;
     try {
-      Statement s = conn.createStatement();
-      ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id = " +
-          lockId);
-      if (!rs.next()) return 0;
-      int rc = rs.getInt(1);
-      return rc;
+      conn = getConnection();
+      stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?");
+      stmt.setLong(1, lockId);
+      rs = stmt.executeQuery();
+      if (!rs.next()) {
+        return 0;
+      }
+      return rs.getInt(1);
     } finally {
-      conn.rollback();
-      conn.close();
+      closeResources(conn, stmt, rs);
     }
   }
 
   public static int findNumCurrentLocks() throws Exception {
     Connection conn = null;
+    Statement stmt = null;
+    ResultSet rs = null;
     try {
       conn = getConnection();
-      Statement s = conn.createStatement();
-      ResultSet rs = s.executeQuery("select count(*) from hive_locks");
-      if (!rs.next()) return 0;
-      int rc = rs.getInt(1);
-      return rc;
-    } finally {
-      if (conn != null) {
-        conn.rollback();
-        conn.close();
+      stmt = conn.createStatement();
+      rs = stmt.executeQuery("select count(*) from hive_locks");
+      if (!rs.next()) {
+        return 0;
       }
+      return rs.getInt(1);
+    } finally {
+      closeResources(conn, stmt, rs);
     }
   }
 
   private static Connection getConnection() throws Exception {
     HiveConf conf = new HiveConf();
     String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
-    Driver driver = (Driver)Class.forName(jdbcDriver).newInstance();
+    Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
     Properties prop = new Properties();
     String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
     String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
-    String passwd = ShimLoader.getHadoopShims().getPassword(conf,
-        HiveConf.ConfVars.METASTOREPWD.varname);
-    prop.put("user", user);
-    prop.put("password", passwd);
+    String passwd =
+      ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+    prop.setProperty("user", user);
+    prop.setProperty("password", passwd);
     return driver.connect(driverUrl, prop);
   }
 
+  private static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
+    if (rs != null) {
+      try {
+        rs.close();
+      } catch (SQLException e) {
+        System.err.println("Error closing ResultSet: " + e.getMessage());
+      }
+    }
+
+    if (stmt != null) {
+      try {
+        stmt.close();
+      } catch (SQLException e) {
+        System.err.println("Error closing Statement: " + e.getMessage());
+      }
+    }
+
+    if (conn != null) {
+      try {
+        conn.rollback();
+      } catch (SQLException e) {
+        System.err.println("Error rolling back: " + e.getMessage());
+      }
+      try {
+        conn.close();
+      } catch (SQLException e) {
+        System.err.println("Error closing Connection: " + e.getMessage());
+      }
+    }
+  }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Wed Sep 10 21:41:16 2014
@@ -185,7 +185,6 @@ public class QueryProperties {
     return this.filterWithSubQuery;
   }
 
-
   public void clear() {
     hasJoin = false;
     hasGroupBy = false;

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Sep 10 21:41:16 2014
@@ -1351,9 +1351,9 @@ public class DDLTask extends Task<DDLWor
         if(harPartitionDir.getUserInfo() != null) {
           authority.append(harPartitionDir.getUserInfo()).append("@");
         }
-        authority.append(harPartitionDir.getHost()).append(":");
+        authority.append(harPartitionDir.getHost());
         if(harPartitionDir.getPort() != -1) {
-          authority.append(harPartitionDir.getPort());
+          authority.append(":").append(harPartitionDir.getPort());
         }
         Path harPath = new Path(harPartitionDir.getScheme(),
             authority.toString(),

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed Sep 10 21:41:16 2014
@@ -557,7 +557,7 @@ public final class FunctionRegistry {
         try {
           FunctionTask.addFunctionResources(func.getResourceUris());
         } catch (Exception e) {
-          LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e);
+          LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e.getMessage(), e);
           return null;
         }
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Sep 10 21:41:16 2014
@@ -92,6 +92,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hive.common.HiveInterruptCallback;
 import org.apache.hadoop.hive.common.HiveInterruptUtils;
 import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -160,6 +161,7 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
 import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
 import org.apache.hadoop.io.SequenceFile.CompressionType;
 import org.apache.hadoop.io.Text;
@@ -3423,6 +3425,41 @@ public final class Utilities {
     }
   }
 
+  public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath,
+      FsPermission fsPermission, boolean recursive) throws IOException {
+    String origUmask = null;
+    LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive "
+        + recursive);
+    if (recursive) {
+      origUmask = conf.get(FsPermission.UMASK_LABEL);
+      // this umask is required because by default the hdfs mask is 022 resulting in
+      // all parents getting the fsPermission & !(022) permission instead of fsPermission
+      conf.set(FsPermission.UMASK_LABEL, "000");
+    }
+    FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
+    boolean retval = false;
+    try {
+      retval = fs.mkdirs(mkdirPath, fsPermission);
+      resetUmaskInConf(conf, recursive, origUmask);
+    } catch (IOException ioe) {
+      resetUmaskInConf(conf, recursive, origUmask);
+      throw ioe;
+    } finally {
+      IOUtils.closeStream(fs);
+    }
+    return retval;
+  }
+
+  private static void resetUmaskInConf(Configuration conf, boolean unsetUmask, String origUmask) {
+    if (unsetUmask) {
+      if (origUmask != null) {
+        conf.set(FsPermission.UMASK_LABEL, origUmask);
+      } else {
+        conf.unset(FsPermission.UMASK_LABEL);
+      }
+    }
+  }
+
   /**
    * Returns true if a plan is both configured for vectorized execution
    * and vectorization is allowed. The plan may be configured for vectorization

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java Wed Sep 10 21:41:16 2014
@@ -19,16 +19,20 @@
 package org.apache.hadoop.hive.ql.exec.vector;
 
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 
 class AggregateDefinition {
+
   private String name;
   private VectorExpressionDescriptor.ArgumentType type;
+  private GroupByDesc.Mode mode;
   private Class<? extends VectorAggregateExpression> aggClass;
 
   AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type, 
-            Class<? extends VectorAggregateExpression> aggClass) {
+		  GroupByDesc.Mode mode, Class<? extends VectorAggregateExpression> aggClass) {
     this.name = name;
     this.type = type;
+    this.mode = mode;
     this.aggClass = aggClass;
   }
 
@@ -38,6 +42,9 @@ class AggregateDefinition {
   VectorExpressionDescriptor.ArgumentType getType() {
     return type;
   }
+  GroupByDesc.Mode getMode() {
+	return mode;
+  }
   Class<? extends VectorAggregateExpression> getAggClass() {
     return aggClass;
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Sep 10 21:41:16 2014
@@ -32,8 +32,11 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
 import org.apache.hadoop.hive.ql.exec.GroupByOperator;
 import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.plan.ap
 import org.apache.hadoop.hive.ql.util.JavaDataModel;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
 
 /**
  * Vectorized GROUP BY operator implementation. Consumes the vectorized input and
  * stores the aggregate operators' intermediate states. Emits row mode output.
  *
  */
-public class VectorGroupByOperator extends GroupByOperator {
+public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion {
 
   private static final Log LOG = LogFactory.getLog(
       VectorGroupByOperator.class.getName());
@@ -70,6 +77,17 @@ public class VectorGroupByOperator exten
    */
   private VectorExpression[] keyExpressions;
 
+  private boolean isVectorOutput;
+
+  // Create a new outgoing vectorization context because column name map will change.
+  private VectorizationContext vOutContext = null;
+
+  private String fileKey;
+
+  // The above members are initialized by the constructor and must not be
+  // transient.
+  //---------------------------------------------------------------------------
+
   private transient VectorExpressionWriter[] keyOutputWriters;
 
   /**
@@ -85,11 +103,18 @@ public class VectorGroupByOperator exten
 
   private transient Object[] forwardCache;
 
+  private transient VectorizedRowBatch outputBatch;
+  private transient VectorizedRowBatchCtx vrbCtx;
+
+  private transient VectorColumnAssign[] vectorColumnAssign;
+  
   /**
-   * Interface for processing mode: global, hash or streaming
+   * Interface for processing mode: global, hash, unsorted streaming, or group batch
    */
   private static interface IProcessingMode {
     public void initialize(Configuration hconf) throws HiveException;
+    public void startGroup() throws HiveException;
+    public void endGroup() throws HiveException;
     public void processBatch(VectorizedRowBatch batch) throws HiveException;
     public void close(boolean aborted) throws HiveException;
   }
@@ -98,6 +123,15 @@ public class VectorGroupByOperator exten
    * Base class for all processing modes
    */
   private abstract class ProcessingModeBase implements IProcessingMode {
+
+    // Overridden and used in sorted reduce group batch processing mode.
+    public void startGroup() throws HiveException {
+      // Do nothing.
+    }
+    public void endGroup() throws HiveException {
+      // Do nothing.
+    }
+
     /**
      * Evaluates the aggregators on the current batch.
      * The aggregationBatchInfo must have been prepared
@@ -170,7 +204,7 @@ public class VectorGroupByOperator exten
     @Override
     public void close(boolean aborted) throws HiveException {
       if (!aborted) {
-        flushSingleRow(null, aggregationBuffers);
+        writeSingleRow(null, aggregationBuffers);
       }
     }
   }
@@ -426,7 +460,7 @@ public class VectorGroupByOperator exten
       while(iter.hasNext()) {
         Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
 
-        flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
+        writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
 
         if (!all) {
           iter.remove();
@@ -501,20 +535,21 @@ public class VectorGroupByOperator exten
         if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
           flush(true);
 
-          changeToStreamingMode();
+          changeToUnsortedStreamingMode();
         }
       }
     }
   }
 
   /**
-   * Streaming processing mode. Intermediate values are flushed each time key changes.
-   * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce.
+   * Unsorted streaming processing mode. Each input VectorizedRowBatch may have
+   * a mix of different keys (hence unsorted).  Intermediate values are flushed
+   * each time key changes.
    */
-  private class ProcessingModeStreaming extends ProcessingModeBase {
+  private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
 
     /** 
-     * The aggreagation buffers used in streaming mode
+     * The aggregation buffers used in streaming mode
      */
     private VectorAggregationBufferRow currentStreamingAggregators;
 
@@ -557,7 +592,7 @@ public class VectorGroupByOperator exten
               // Nothing to do
             }
           });
-      LOG.info("using streaming aggregation processing mode");
+      LOG.info("using unsorted streaming aggregation processing mode");
     }
 
     @Override
@@ -601,7 +636,7 @@ public class VectorGroupByOperator exten
 
       // Now flush/forward all keys/rows, except the last (current) one
       for (int i = 0; i < flushMark; ++i) {
-        flushSingleRow(keysToFlush[i], rowsToFlush[i]);
+        writeSingleRow(keysToFlush[i], rowsToFlush[i]);
         rowsToFlush[i].reset();
         streamAggregationBufferRowPool.putInPool(rowsToFlush[i]);
       }
@@ -610,7 +645,79 @@ public class VectorGroupByOperator exten
     @Override
     public void close(boolean aborted) throws HiveException {
       if (!aborted && null != streamingKey) {
-        flushSingleRow(streamingKey, currentStreamingAggregators);
+        writeSingleRow(streamingKey, currentStreamingAggregators);
+      }
+    }
+  }
+
+  /**
+   * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the
+   * same key.  On endGroup (or close), the intermediate values are flushed.
+   */
+  private class ProcessingModeGroupBatches extends ProcessingModeBase {
+
+    private boolean inGroup;
+    private boolean first;
+
+    /**
+     * The group vector key helper.
+     */
+    VectorGroupKeyHelper groupKeyHelper;
+
+    /** 
+     * The group vector aggregation buffers.
+     */
+    private VectorAggregationBufferRow groupAggregators;
+
+    /**
+     * Buffer to hold string values.
+     */
+    private DataOutputBuffer buffer;
+
+    @Override
+    public void initialize(Configuration hconf) throws HiveException {
+      inGroup = false;
+      groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length);
+      groupKeyHelper.init(keyExpressions);
+      groupAggregators = allocateAggregationBuffer();
+      buffer = new DataOutputBuffer();
+      LOG.info("using sorted group batch aggregation processing mode");
+    }
+
+    @Override
+    public void startGroup() throws HiveException {
+      inGroup = true;
+      first = true;
+    }
+
+    @Override
+    public void endGroup() throws HiveException {
+      if (inGroup && !first) {
+        writeGroupRow(groupAggregators, buffer);
+        groupAggregators.reset();
+      }
+      inGroup = false;
+    }
+
+    @Override
+    public void processBatch(VectorizedRowBatch batch) throws HiveException {
+      assert(inGroup);
+      if (first) {
+        // Copy the group key to output batch now.  We'll copy in the aggregates at the end of the group.
+        first = false;
+        groupKeyHelper.copyGroupKey(batch, outputBatch, buffer);
+      }
+
+      // Aggregate this batch.
+      for (int i = 0; i < aggregators.length; ++i) {
+        aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch);
+      }
+    }
+
+    @Override
+    public void close(boolean aborted) throws HiveException {
+      if (!aborted && inGroup && !first) {
+        writeGroupRow(groupAggregators, buffer);
       }
     }
   }
@@ -633,8 +740,20 @@ public class VectorGroupByOperator exten
     aggregators = new VectorAggregateExpression[aggrDesc.size()];
     for (int i = 0; i < aggrDesc.size(); ++i) {
       AggregationDesc aggDesc = aggrDesc.get(i);
-      aggregators[i] = vContext.getAggregatorExpression(aggDesc);
+      aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce());
     }
+    
+    isVectorOutput = desc.getVectorDesc().isVectorOutput();
+
+    List<String> outColNames = desc.getOutputColumnNames();
+    Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
+    int outColIndex = 0;
+    for(String outCol: outColNames) {
+      mapOutCols.put(outCol,  outColIndex++);
+    }
+    vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+    vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_");
+    fileKey = vOutContext.getFileKey();
   }
 
   public VectorGroupByOperator() {
@@ -662,13 +781,23 @@ public class VectorGroupByOperator exten
         objectInspectors.add(aggregators[i].getOutputObjectInspector());
       }
 
-      keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
-      aggregationBatchInfo = new VectorAggregationBufferBatch();
-      aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
-
+      if (!conf.getVectorDesc().isVectorGroupBatches()) {
+        // These data structures are only used by the map-side processing modes.
+        keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+        aggregationBatchInfo = new VectorAggregationBufferBatch();
+        aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
+      }
+      LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput);
       List<String> outputFieldNames = conf.getOutputColumnNames();
       outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
           outputFieldNames, objectInspectors);
+      if (isVectorOutput) {
+          vrbCtx = new VectorizedRowBatchCtx();
+          vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector);
+          outputBatch = vrbCtx.createVectorizedRowBatch();
+          vectorColumnAssign = VectorColumnAssignFactory.buildAssigners(
+              outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames());
+      }
 
     } catch (HiveException he) {
       throw he;
@@ -678,32 +807,43 @@ public class VectorGroupByOperator exten
 
     initializeChildren(hconf);
 
-    forwardCache =new Object[keyExpressions.length + aggregators.length];
+    forwardCache = new Object[keyExpressions.length + aggregators.length];
 
     if (keyExpressions.length == 0) {
-      processingMode = this.new ProcessingModeGlobalAggregate();
-    }
-    else {
-      //TODO: consider if parent can offer order guarantees
-      // If input is sorted, is more efficient to use the streaming mode
+        processingMode = this.new ProcessingModeGlobalAggregate();
+    } else if (conf.getVectorDesc().isVectorGroupBatches()) {
+      // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
+      processingMode = this.new ProcessingModeGroupBatches();
+    } else {
+      // We start in hash mode and may dynamically switch to unsorted stream mode.
       processingMode = this.new ProcessingModeHashAggregate();
     }
     processingMode.initialize(hconf);
   }
 
   /**
-   * changes the processing mode to streaming
+   * changes the processing mode to unsorted streaming
    * This is done at the request of the hash agg mode, if the number of keys 
    * exceeds the minReductionHashAggr factor
    * @throws HiveException 
    */
-  private void changeToStreamingMode() throws HiveException {
-    processingMode = this.new ProcessingModeStreaming();
+  private void changeToUnsortedStreamingMode() throws HiveException {
+    processingMode = this.new ProcessingModeUnsortedStreaming();
     processingMode.initialize(null);
     LOG.trace("switched to streaming mode");
   }
 
   @Override
+  public void startGroup() throws HiveException {
+    processingMode.startGroup();
+  }
+
+  @Override
+  public void endGroup() throws HiveException {
+    processingMode.endGroup();
+  }
+
+  @Override
   public void processOp(Object row, int tag) throws HiveException {
     VectorizedRowBatch batch = (VectorizedRowBatch) row;
 
@@ -719,26 +859,72 @@ public class VectorGroupByOperator exten
    * @param agg
    * @throws HiveException
    */
-  private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
+  private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
       throws HiveException {
     int fi = 0;
-    for (int i = 0; i < keyExpressions.length; ++i) {
-      forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
-          kw, i, keyOutputWriters[i]);
+    if (!isVectorOutput) {
+      // Output row.
+      for (int i = 0; i < keyExpressions.length; ++i) {
+        forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
+            kw, i, keyOutputWriters[i]);
+      }
+      for (int i = 0; i < aggregators.length; ++i) {
+        forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(String.format("forwarding keys: %s: %s",
+            kw, Arrays.toString(forwardCache)));
+      }
+      forward(forwardCache, outputObjInspector);
+    } else {
+      // Output keys and aggregates into the output batch.
+      for (int i = 0; i < keyExpressions.length; ++i) {
+        vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue (
+                  kw, i, keyOutputWriters[i]), outputBatch.size);
+      }
+      for (int i = 0; i < aggregators.length; ++i) {
+        vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+                  agg.getAggregationBuffer(i)), outputBatch.size);
+      }
+      ++outputBatch.size;
+      if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+        flushOutput();
+      }
     }
+  }
+
+  /**
+   * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and
+   * the row aggregation buffers values
+   * @param agg
+   * @param buffer
+   * @throws HiveException
+   */
+  private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
+      throws HiveException {
+    int fi = keyExpressions.length;   // Start after group keys.
     for (int i = 0; i < aggregators.length; ++i) {
-      forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+      vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+                agg.getAggregationBuffer(i)), outputBatch.size);
     }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug(String.format("forwarding keys: %s: %s",
-          kw, Arrays.toString(forwardCache)));
+    ++outputBatch.size;
+    if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+      flushOutput();
+      buffer.reset();
     }
-    forward(forwardCache, outputObjInspector);
+  }
+
+  private void flushOutput() throws HiveException {
+    forward(outputBatch, null);
+    outputBatch.reset();
   }
 
   @Override
   public void closeOp(boolean aborted) throws HiveException {
     processingMode.close(aborted);
+    if (!aborted && isVectorOutput && outputBatch.size > 0) {
+      flushOutput();
+    }
   }
 
   static public String getOperatorName() {
@@ -761,4 +947,8 @@ public class VectorGroupByOperator exten
     this.aggregators = aggregators;
   }
 
+  @Override
+  public VectorizationContext getOuputVectorizationContext() {
+    return vOutContext;
+  }
 }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Wed Sep 10 21:41:16 2014
@@ -60,6 +60,7 @@ public class VectorHashKeyWrapper extend
     byteStarts = new int[byteValuesCount];
     byteLengths = new int[byteValuesCount];
     isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount];
+    hashcode = 0;
   }
 
   private VectorHashKeyWrapper() {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java Wed Sep 10 21:41:16 2014
@@ -32,41 +32,10 @@ import org.apache.hadoop.hive.serde2.laz
  * This class stores additional information about keys needed to evaluate and output the key values.
  *
  */
-public class VectorHashKeyWrapperBatch {
+public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
 
-  /**
-   * Helper class for looking up a key value based on key index.
-   */
-  private static class KeyLookupHelper {
-    private int longIndex;
-    private int doubleIndex;
-    private int stringIndex;
-    private int decimalIndex;
-
-    private static final int INDEX_UNUSED = -1;
-
-    private void resetIndices() {
-        this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED;
-    }
-    public void setLong(int index) {
-        resetIndices();
-        this.longIndex= index;
-    }
-
-    public void setDouble(int index) {
-        resetIndices();
-        this.doubleIndex = index;
-    }
-
-    public void setString(int index) {
-        resetIndices();
-        this.stringIndex = index;
-    }
-
-    public void setDecimal(int index) {
-        resetIndices();
-        this.decimalIndex = index;
-    }
+  public VectorHashKeyWrapperBatch(int keyCount) {
+    super(keyCount);
   }
 
   /**
@@ -80,26 +49,6 @@ public class VectorHashKeyWrapperBatch {
   private VectorExpression[] keyExpressions;
 
   /**
-   * indices of LONG primitive keys.
-   */
-  private int[] longIndices;
-
-  /**
-   * indices of DOUBLE primitive keys.
-   */
-  private int[] doubleIndices;
-
-  /**
-   * indices of string (byte[]) primitive keys.
-   */
-  private int[] stringIndices;
-
-  /**
-   * indices of decimal primitive keys.
-   */
-  private int[] decimalIndices;
-
-  /**
    * Pre-allocated batch size vector of keys wrappers.
    * N.B. these keys are **mutable** and should never be used in a HashMap.
    * Always clone the key wrapper to obtain an immutable keywrapper suitable
@@ -108,11 +57,6 @@ public class VectorHashKeyWrapperBatch {
   private VectorHashKeyWrapper[] vectorHashKeyWrappers;
 
   /**
-   * Lookup vector to map from key index to primitive type index.
-   */
-  private KeyLookupHelper[] indexLookup;
-
-  /**
    * The fixed size of the key wrappers.
    */
   private int keysFixedSize;
@@ -567,53 +511,17 @@ public class VectorHashKeyWrapperBatch {
    */
   public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions)
     throws HiveException {
-    VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch();
+    VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch(keyExpressions.length);
     compiledKeyWrapperBatch.keyExpressions = keyExpressions;
 
     compiledKeyWrapperBatch.keysFixedSize = 0;
 
-    // We'll overallocate and then shrink the array for each type
-    int[] longIndices = new int[keyExpressions.length];
-    int longIndicesIndex = 0;
-    int[] doubleIndices = new int[keyExpressions.length];
-    int doubleIndicesIndex  = 0;
-    int[] stringIndices = new int[keyExpressions.length];
-    int stringIndicesIndex = 0;
-    int[] decimalIndices = new int[keyExpressions.length];
-    int decimalIndicesIndex = 0;
-    KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length];
-
     // Inspect the output type of each key expression.
     for(int i=0; i < keyExpressions.length; ++i) {
-      indexLookup[i] = new KeyLookupHelper();
-      String outputType = keyExpressions[i].getOutputType();
-      if (VectorizationContext.isIntFamily(outputType) ||
-          VectorizationContext.isDatetimeFamily(outputType)) {
-        longIndices[longIndicesIndex] = i;
-        indexLookup[i].setLong(longIndicesIndex);
-        ++longIndicesIndex;
-      } else if (VectorizationContext.isFloatFamily(outputType)) {
-        doubleIndices[doubleIndicesIndex] = i;
-        indexLookup[i].setDouble(doubleIndicesIndex);
-        ++doubleIndicesIndex;
-      } else if (VectorizationContext.isStringFamily(outputType)) {
-        stringIndices[stringIndicesIndex]= i;
-        indexLookup[i].setString(stringIndicesIndex);
-        ++stringIndicesIndex;
-      } else if (VectorizationContext.isDecimalFamily(outputType)) {
-          decimalIndices[decimalIndicesIndex]= i;
-          indexLookup[i].setDecimal(decimalIndicesIndex);
-          ++decimalIndicesIndex;
-      }
-      else {
-        throw new HiveException("Unsuported vector output type: " + outputType);
-      }
-    }
-    compiledKeyWrapperBatch.indexLookup = indexLookup;
-    compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
-    compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
-    compiledKeyWrapperBatch.stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
-    compiledKeyWrapperBatch.decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
+      compiledKeyWrapperBatch.addKey(keyExpressions[i].getOutputType());
+    }
+    compiledKeyWrapperBatch.finishAdding();
+
     compiledKeyWrapperBatch.vectorHashKeyWrappers =
         new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
     for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
@@ -632,11 +540,11 @@ public class VectorHashKeyWrapperBatch {
         model.memoryAlign());
 
     // Now add the key wrapper arrays
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(longIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(doubleIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(stringIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(decimalIndicesIndex);
-    compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(longIndicesIndex) * 2;
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(compiledKeyWrapperBatch.longIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(compiledKeyWrapperBatch.doubleIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.stringIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.decimalIndices.length);
+    compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(compiledKeyWrapperBatch.longIndices.length) * 2;
     compiledKeyWrapperBatch.keysFixedSize +=
         model.lengthForBooleanArrayOfSize(keyExpressions.length);
 

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed Sep 10 21:41:16 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.UD
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
+import org.apache.hadoop.hive.ql.exec.vector.AggregateDefinition;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.udf.SettableUDF;
 import org.apache.hadoop.hive.ql.udf.UDFConv;
 import org.apache.hadoop.hive.ql.udf.UDFHex;
@@ -198,7 +200,8 @@ public class VectorizationContext {
 
   protected int getInputColumnIndex(String name) {
     if (!columnMap.containsKey(name)) {
-      LOG.error(String.format("The column %s is not in the vectorization context column map.", name));
+      LOG.error(String.format("The column %s is not in the vectorization context column map %s.", 
+                 name, columnMap.toString()));
     }
     return columnMap.get(name);
   }
@@ -1880,50 +1883,55 @@ public class VectorizationContext {
     }
   }
 
+  // TODO: When we support vectorized STRUCTs and can handle more in the reduce-side (MERGEPARTIAL):
+  // TODO:   Write reduce-side versions of AVG. Currently, only map-side (HASH) versions are in table. 
+  // TODO:   And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used..  Right now they are conservatively
+  //         marked map-side (HASH).
   static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFMinLong.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFMinDouble.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMinString.class));
-    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFMinDecimal.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFMaxLong.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFMaxDouble.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMaxString.class));
-    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFMaxDecimal.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          VectorUDAFCountStar.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFCount.class));
-    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFCount.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFSumLong.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFSumDouble.class));
-    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFSumDecimal.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFAvgLong.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFAvgDouble.class));
-    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFAvgDecimal.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarPopLong.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarPopDouble.class));
-    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarPopDecimal.class));
-    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFVarSampLong.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFVarSampDouble.class));
-    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFVarSampDecimal.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdPopLong.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdPopDouble.class));
-    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdPopDecimal.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    VectorUDAFStdSampLong.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  VectorUDAFStdSampDouble.class));
-    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       VectorUDAFStdSampDecimal.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMinLong.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMinDouble.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMinString.class));
+    add(new AggregateDefinition("min",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMinDecimal.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFMaxLong.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFMaxDouble.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null,                          VectorUDAFMaxString.class));
+    add(new AggregateDefinition("max",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFMaxDecimal.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.NONE,          GroupByDesc.Mode.HASH,         VectorUDAFCountStar.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("count",       VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFCount.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    null,                          VectorUDAFSumLong.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  null,                          VectorUDAFSumDouble.class));
+    add(new AggregateDefinition("sum",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       null,                          VectorUDAFSumDecimal.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFAvgLong.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFAvgDouble.class));
+    add(new AggregateDefinition("avg",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFAvgDecimal.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarPopLong.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarPopDouble.class));
+    add(new AggregateDefinition("variance",    VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_pop",     VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarPopDecimal.class));
+    add(new AggregateDefinition("var_samp",    VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFVarSampLong.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFVarSampDouble.class));
+    add(new AggregateDefinition("var_samp" ,   VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFVarSampDecimal.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdPopLong.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdPopDouble.class));
+    add(new AggregateDefinition("std",         VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev",      VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_pop",  VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdPopDecimal.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY,    GroupByDesc.Mode.HASH,         VectorUDAFStdSampLong.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY,  GroupByDesc.Mode.HASH,         VectorUDAFStdSampDouble.class));
+    add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL,       GroupByDesc.Mode.HASH,         VectorUDAFStdSampDecimal.class));
   }};
 
-  public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
+  public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)
       throws HiveException {
 
     ArrayList<ExprNodeDesc> paramDescList = desc.getParameters();
@@ -1948,8 +1956,15 @@ public class VectorizationContext {
     for (AggregateDefinition aggDef : aggregatesDefinition) {
       if (aggregateName.equalsIgnoreCase(aggDef.getName()) &&
           ((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE &&
-            inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
+           inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
           (aggDef.getType().isSameTypeOrFamily(inputType)))) {
+
+    	if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduce) {
+    	  continue;
+    	} else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduce) {
+    	  continue;
+    	}
+
         Class<? extends VectorAggregateExpression> aggClass = aggDef.getAggClass();
         try
         {
@@ -1967,7 +1982,7 @@ public class VectorizationContext {
     }
 
     throw new HiveException("Vector aggregate not implemented: \"" + aggregateName +
-        "\" for type: \"" + inputType.name() + "");
+        "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")");
   }
 
   public Map<Integer, String> getOutputColumnTypeMap() {

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Sep 10 21:41:16 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.IOPrepareCache;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.serde2.Deserializer;
@@ -124,15 +125,20 @@ public class VectorizedRowBatchCtx {
    * Used by non-tablescan operators when they change the vectorization context 
    * @param hiveConf
    * @param fileKey 
-   *          The key on which to retrieve the extra column mapping from the map scratch
+   *          The key on which to retrieve the extra column mapping from the map/reduce scratch
    * @param rowOI
    *          Object inspector that shapes the column types
    */
   public void init(Configuration hiveConf, String fileKey,
       StructObjectInspector rowOI) {
-    columnTypeMap = Utilities
-        .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
-        .get(fileKey);
+    MapredWork mapredWork = Utilities.getMapRedWork(hiveConf);
+    Map<String, Map<Integer, String>> scratchColumnVectorTypes;
+    if (mapredWork.getMapWork() != null) {
+      scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes();
+    } else {
+      scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes();
+    }
+    columnTypeMap = scratchColumnVectorTypes.get(fileKey);
     this.rowOI= rowOI;
     this.rawRowOI = rowOI;
   }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Sep 10 21:41:16 2014
@@ -2336,6 +2336,12 @@ class RecordReaderImpl implements Record
       return ((DecimalColumnStatistics) index).getMaximum();
     } else if (index instanceof TimestampColumnStatistics) {
       return ((TimestampColumnStatistics) index).getMaximum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
+        return "true";
+      } else {
+        return "false";
+      }
     } else {
       return null;
     }
@@ -2360,6 +2366,12 @@ class RecordReaderImpl implements Record
       return ((DecimalColumnStatistics) index).getMinimum();
     } else if (index instanceof TimestampColumnStatistics) {
       return ((TimestampColumnStatistics) index).getMinimum();
+    } else if (index instanceof BooleanColumnStatistics) {
+      if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
+        return "false";
+      } else {
+        return "true";
+      }
     } else {
       return null;
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Wed Sep 10 21:41:16 2014
@@ -331,6 +331,8 @@ final class SearchArgumentImpl implement
             return PredicateLeaf.Type.TIMESTAMP;
           case DECIMAL:
             return PredicateLeaf.Type.DECIMAL;
+          case BOOLEAN:
+        	return PredicateLeaf.Type.BOOLEAN;
           default:
         }
       }
@@ -368,6 +370,7 @@ final class SearchArgumentImpl implement
         case DATE:
         case TIMESTAMP:
         case DECIMAL:
+        case BOOLEAN:
           return lit;
         default:
           throw new IllegalArgumentException("Unknown literal " + getType(lit));
@@ -963,7 +966,8 @@ final class SearchArgumentImpl implement
           literal instanceof DateWritable ||
           literal instanceof Timestamp ||
           literal instanceof HiveDecimal ||
-          literal instanceof BigDecimal) {
+          literal instanceof BigDecimal ||
+          literal instanceof Boolean) {
         return literal;
       } else if (literal instanceof HiveChar ||
           literal instanceof HiveVarchar) {
@@ -1000,6 +1004,8 @@ final class SearchArgumentImpl implement
       }else if (literal instanceof HiveDecimal ||
           literal instanceof BigDecimal) {
         return PredicateLeaf.Type.DECIMAL;
+      } else if (literal instanceof Boolean) {
+    	return PredicateLeaf.Type.BOOLEAN;
       }
       throw new IllegalArgumentException("Unknown type for literal " + literal);
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Sep 10 21:41:16 2014
@@ -364,8 +364,10 @@ abstract public class AbstractSMBJoinPro
     for (int pos = 0; pos < sortCols.size(); pos++) {
       Order o = sortCols.get(pos);
 
-      if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
-        return false;
+      if (pos < sortColumnsFirstPartition.size()) {
+        if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
+          return false;
+        }
       }
       sortColNames.add(o.getCol());
     }

Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Sep 10 21:41:16 2014
@@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.mr
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
 import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
 import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
 import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
 import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
 import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.SM
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.UDFAcos;
 import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -290,23 +294,26 @@ public class Vectorizer implements Physi
         throws SemanticException {
       Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
       if (currTask instanceof MapRedTask) {
-        convertMapWork(((MapRedTask) currTask).getWork().getMapWork());
+        convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
       } else if (currTask instanceof TezTask) {
         TezWork work = ((TezTask) currTask).getWork();
         for (BaseWork w: work.getAllWork()) {
           if (w instanceof MapWork) {
-            convertMapWork((MapWork)w);
+            convertMapWork((MapWork) w, true);
           } else if (w instanceof ReduceWork) {
             // We are only vectorizing Reduce under Tez.
-            convertReduceWork((ReduceWork)w);
+            if (HiveConf.getBoolVar(pctx.getConf(),
+                        HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
+              convertReduceWork((ReduceWork) w);
+            }
           }
         }
       }
       return null;
     }
 
-    private void convertMapWork(MapWork mapWork) throws SemanticException {
-      boolean ret = validateMapWork(mapWork);
+    private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      boolean ret = validateMapWork(mapWork, isTez);
       if (ret) {
         vectorizeMapWork(mapWork);
       }
@@ -319,7 +326,8 @@ public class Vectorizer implements Physi
           + ReduceSinkOperator.getOperatorName()), np);
     }
 
-    private boolean validateMapWork(MapWork mapWork) throws SemanticException {
+    private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+      LOG.info("Validating MapWork...");
 
       // Validate the input format
       for (String path : mapWork.getPathToPartitionInfo().keySet()) {
@@ -333,7 +341,7 @@ public class Vectorizer implements Physi
         }
       }
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
-      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor();
+      MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez);
       addMapWorkRules(opRules, vnp);
       Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
       GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -417,9 +425,12 @@ public class Vectorizer implements Physi
     private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
       opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np);
       opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np);
+      opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np);
     }
 
     private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+      LOG.info("Validating ReduceWork...");
+
       // Validate input to ReduceWork.
       if (!getOnlyStructObjectInspectors(reduceWork)) {
         return false;
@@ -487,16 +498,21 @@ public class Vectorizer implements Physi
 
   class MapWorkValidationNodeProcessor implements NodeProcessor {
 
+    private boolean isTez;
+
+    public MapWorkValidationNodeProcessor(boolean isTez) {
+      this.isTez = isTez;
+    }
+
     @Override
     public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
         Object... nodeOutputs) throws SemanticException {
       for (Node n : stack) {
         Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
-        if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) &&
-            op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+        if (nonVectorizableChildOfGroupBy(op)) {
           return new Boolean(true);
         }
-        boolean ret = validateMapWorkOperator(op);
+        boolean ret = validateMapWorkOperator(op, isTez);
         if (!ret) {
           LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
           return new Boolean(false);
@@ -513,6 +529,9 @@ public class Vectorizer implements Physi
         Object... nodeOutputs) throws SemanticException {
       for (Node n : stack) {
         Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+        if (nonVectorizableChildOfGroupBy(op)) {
+          return new Boolean(true);
+        }
         boolean ret = validateReduceWorkOperator(op);
         if (!ret) {
           LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
@@ -579,21 +598,6 @@ public class Vectorizer implements Physi
       return vContext;
     }
 
-    public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
-      Operator<? extends OperatorDesc> currentOp = op;
-      while (currentOp.getParentOperators().size() > 0) {
-        currentOp = currentOp.getParentOperators().get(0);
-        if (currentOp.getType().equals(OperatorType.GROUPBY)) {
-          // No need to vectorize
-          if (!opsDone.contains(op)) {
-            opsDone.add(op);
-          }
-          return true;
-        }
-      }
-      return false;
-    }
-
     public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
             throws SemanticException {
       Operator<? extends OperatorDesc> vectorOp = op;
@@ -665,9 +669,13 @@ public class Vectorizer implements Physi
 
       assert vContext != null;
 
-      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
-      // any operators below GROUPBY.
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
       if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+            opsDone.add(op);
+          }
         return null;
       }
 
@@ -719,13 +727,22 @@ public class Vectorizer implements Physi
 
       assert vContext != null;
 
-      // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs.  So, don't vectorize
-      // any operators below GROUPBY.
+      // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+      // vectorize the operators below it.
       if (nonVectorizableChildOfGroupBy(op)) {
+        // No need to vectorize
+        if (!opsDone.contains(op)) {
+          opsDone.add(op);
+        }
         return null;
       }
 
       Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+      if (vectorOp instanceof VectorGroupByOperator) {
+        VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp;
+        VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc();
+        vectorDesc.setVectorGroupBatches(true);
+      }
       if (saveRootVectorOp && op != vectorOp) {
         rootVectorOp = vectorOp;
       }
@@ -772,7 +789,7 @@ public class Vectorizer implements Physi
     return pctx;
   }
 
-  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) {
+  boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, boolean isTez) {
     boolean ret = false;
     switch (op.getType()) {
       case MAPJOIN:
@@ -783,7 +800,7 @@ public class Vectorizer implements Physi
         }
         break;
       case GROUPBY:
-        ret = validateGroupByOperator((GroupByOperator) op);
+        ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
         break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
@@ -814,6 +831,17 @@ public class Vectorizer implements Physi
       case EXTRACT:
         ret = validateExtractOperator((ExtractOperator) op);
         break;
+      case MAPJOIN:
+        // Does MAPJOIN actually get planned in Reduce?
+        if (op instanceof MapJoinOperator) {
+          ret = validateMapJoinOperator((MapJoinOperator) op);
+        } else if (op instanceof SMBMapJoinOperator) {
+          ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+        }
+        break;
+      case GROUPBY:
+        ret = validateGroupByOperator((GroupByOperator) op, true, true);
+        break;
       case FILTER:
         ret = validateFilterOperator((FilterOperator) op);
         break;
@@ -836,6 +864,23 @@ public class Vectorizer implements Physi
     return ret;
   }
 
+  public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+    Operator<? extends OperatorDesc> currentOp = op;
+    while (currentOp.getParentOperators().size() > 0) {
+      currentOp = currentOp.getParentOperators().get(0);
+      if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+        GroupByDesc desc = (GroupByDesc)currentOp.getConf();
+        boolean isVectorOutput = desc.getVectorDesc().isVectorOutput();
+        if (isVectorOutput) {
+          // This GROUP BY does vectorize its output.
+          return false;
+        }
+        return true;
+      }
+    }
+    return false;
+  }
+
   private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
     SMBJoinDesc desc = op.getConf();
     // Validation is the same as for map join, since the 'small' tables are not vectorized
@@ -886,16 +931,57 @@ public class Vectorizer implements Physi
     return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
   }
 
-  private boolean validateGroupByOperator(GroupByOperator op) {
-    if (op.getConf().isGroupingSetsPresent()) {
-      LOG.warn("Grouping sets not supported in vector mode");
+  private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+    GroupByDesc desc = op.getConf();
+    VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+
+    if (desc.isGroupingSetsPresent()) {
+      LOG.info("Grouping sets not supported in vector mode");
       return false;
     }
-    boolean ret = validateExprNodeDesc(op.getConf().getKeys());
+    boolean ret = validateExprNodeDesc(desc.getKeys());
     if (!ret) {
       return false;
     }
-    return validateAggregationDesc(op.getConf().getAggregators());
+    ret = validateAggregationDesc(desc.getAggregators(), isReduce);
+    if (!ret) {
+      return false;
+    }
+    boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce);
+    vectorDesc.setVectorOutput(isVectorOutput);
+    if (isReduce) {
+      if (desc.isDistinct()) {
+        LOG.info("Distinct not supported in reduce vector mode");
+        return false;    
+      }
+      // Sort-based GroupBy?
+      if (desc.getMode() != GroupByDesc.Mode.COMPLETE &&
+          desc.getMode() != GroupByDesc.Mode.PARTIAL1 &&
+          desc.getMode() != GroupByDesc.Mode.PARTIAL2 &&
+          desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) {
+        LOG.info("Reduce vector mode not supported when input for GROUP BY not sorted");
+        return false;
+      }
+      LOG.info("Reduce GROUP BY mode is " + desc.getMode().name());
+      if (desc.getGroupKeyNotReductionKey()) {
+        LOG.info("Reduce vector mode not supported when group key is not reduction key");
+        return false;    
+      }
+      if (!isVectorOutput) {
+        LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types");
+        return false;    
+      }
+      if (desc.getKeys().size() > 0) {
+        LOG.info("Reduce-side GROUP BY will process key groups");
+        vectorDesc.setVectorGroupBatches(true);
+      } else {
+        LOG.info("Reduce-side GROUP BY will do global aggregation");
+      }
+      vectorDesc.setIsReduce(true);
+    } else {
+      LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput);
+    }
+    return true;
   }
 
   private boolean validateExtractOperator(ExtractOperator op) {
@@ -930,9 +1016,9 @@ public class Vectorizer implements Physi
     return true;
   }
 
-  private boolean validateAggregationDesc(List<AggregationDesc> descs) {
+  private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduce) {
     for (AggregationDesc d : descs) {
-      boolean ret = validateAggregationDesc(d);
+      boolean ret = validateAggregationDesc(d, isReduce);
       if (!ret) {
         return false;
       }
@@ -952,9 +1038,7 @@ public class Vectorizer implements Physi
     String typeName = desc.getTypeInfo().getTypeName();
     boolean ret = validateDataType(typeName);
     if (!ret) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName);
-      }
+      LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
       return false;
     }
     if (desc instanceof ExprNodeGenericFuncDesc) {
@@ -987,12 +1071,11 @@ public class Vectorizer implements Physi
       VectorizationContext vc = new ValidatorVectorizationContext();
       if (vc.getVectorExpression(desc, mode) == null) {
         // TODO: this cannot happen - VectorizationContext throws in such cases.
+        LOG.info("getVectorExpression returned null");
         return false;
       }
     } catch (Exception e) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Failed to vectorize", e);
-      }
+      LOG.info("Failed to vectorize", e);
       return false;
     }
     return true;
@@ -1011,16 +1094,56 @@ public class Vectorizer implements Physi
     }
   }
 
-  private boolean validateAggregationDesc(AggregationDesc aggDesc) {
+  private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduce) {
     if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) {
       return false;
     }
     if (aggDesc.getParameters() != null) {
       return validateExprNodeDesc(aggDesc.getParameters());
     }
+    // See if we can vectorize the aggregation.
+    try {
+      VectorizationContext vc = new ValidatorVectorizationContext();
+      if (vc.getAggregatorExpression(aggDesc, isReduce) == null) {
+        // TODO: this cannot happen - VectorizationContext throws in such cases.
+        LOG.info("getAggregatorExpression returned null");
+        return false;
+      }
+    } catch (Exception e) {
+      LOG.info("Failed to vectorize", e);
+      return false;
+    }
+    return true;
+  }
+
+  private boolean aggregatorsOutputIsPrimitive(List<AggregationDesc> descs, boolean isReduce) {
+    for (AggregationDesc d : descs) {
+      boolean ret = aggregatorsOutputIsPrimitive(d, isReduce);
+      if (!ret) {
+        return false;
+      }
+    }
     return true;
   }
 
+  private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc, boolean isReduce) {
+    VectorizationContext vc = new ValidatorVectorizationContext();
+    VectorAggregateExpression vectorAggrExpr;
+    try {
+        vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduce);
+    } catch (Exception e) {
+      // We should have already attempted to vectorize in validateAggregationDesc.
+      LOG.info("Vectorization of aggreation should have succeeded ", e);
+      return false;
+    }
+
+    ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
+    if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+      return true;
+    }
+    return false;
+  }
+
   private boolean validateDataType(String type) {
     return supportedDataTypesPattern.matcher(type.toLowerCase()).matches();
   }