You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/10 23:41:17 UTC
svn commit: r1624140 [2/3] - in /hive/branches/cbo: ./
common/src/java/org/apache/hadoop/hive/conf/ hbase-handler/
hbase-handler/if/ hbase-handler/src/java/org/apache/hadoop/hive/hbase/
hbase-handler/src/java/org/apache/hadoop/hive/hbase/struct/ hbase-...
Modified: hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/branches/cbo/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Wed Sep 10 21:41:16 2014
@@ -17,30 +17,38 @@
*/
package org.apache.hadoop.hive.metastore.txn;
-import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.shims.ShimLoader;
-
import java.sql.Connection;
import java.sql.Driver;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.shims.ShimLoader;
+
/**
- * Utility methods for creating and destroying txn database/schema. Placed
- * here in a separate class so it can be shared across unit tests.
+ * Utility methods for creating and destroying txn database/schema.
+ * Placed here in a separate class so it can be shared across unit tests.
*/
-public class TxnDbUtil {
- private final static String txnMgr = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+public final class TxnDbUtil {
+
+ private static final String TXN_MANAGER = "org.apache.hadoop.hive.ql.lockmgr.DbTxnManager";
+
+ private TxnDbUtil() {
+ throw new UnsupportedOperationException("Can't initialize class");
+ }
/**
* Set up the configuration so it will use the DbTxnManager, concurrency will be set to true,
* and the JDBC configs will be set for putting the transaction and lock info in the embedded
* metastore.
- * @param conf HiveConf to add these values to.
+ *
+ * @param conf HiveConf to add these values to
*/
public static void setConfValues(HiveConf conf) {
- conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, txnMgr);
+ conf.setVar(HiveConf.ConfVars.HIVE_TXN_MANAGER, TXN_MANAGER);
conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, true);
}
@@ -49,187 +57,193 @@ public class TxnDbUtil {
// intended for creating derby databases, and thus will inexorably get
// out of date with it. I'm open to any suggestions on how to make this
// read the file in a build friendly way.
+
Connection conn = null;
- boolean committed = false;
+ Statement stmt = null;
try {
conn = getConnection();
- Statement s = conn.createStatement();
- s.execute("CREATE TABLE TXNS (" +
- " TXN_ID bigint PRIMARY KEY," +
- " TXN_STATE char(1) NOT NULL," +
- " TXN_STARTED bigint NOT NULL," +
- " TXN_LAST_HEARTBEAT bigint NOT NULL," +
- " TXN_USER varchar(128) NOT NULL," +
- " TXN_HOST varchar(128) NOT NULL)");
-
- s.execute("CREATE TABLE TXN_COMPONENTS (" +
- " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
- " TC_DATABASE varchar(128) NOT NULL," +
- " TC_TABLE varchar(128)," +
- " TC_PARTITION varchar(767))");
- s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
- " CTC_TXNID bigint," +
- " CTC_DATABASE varchar(128) NOT NULL," +
- " CTC_TABLE varchar(128)," +
- " CTC_PARTITION varchar(767))");
- s.execute("CREATE TABLE NEXT_TXN_ID (" +
- " NTXN_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
- s.execute("CREATE TABLE HIVE_LOCKS (" +
- " HL_LOCK_EXT_ID bigint NOT NULL," +
- " HL_LOCK_INT_ID bigint NOT NULL," +
- " HL_TXNID bigint," +
- " HL_DB varchar(128) NOT NULL," +
- " HL_TABLE varchar(128)," +
- " HL_PARTITION varchar(767)," +
- " HL_LOCK_STATE char(1) NOT NULL," +
- " HL_LOCK_TYPE char(1) NOT NULL," +
- " HL_LAST_HEARTBEAT bigint NOT NULL," +
- " HL_ACQUIRED_AT bigint," +
- " HL_USER varchar(128) NOT NULL," +
- " HL_HOST varchar(128) NOT NULL," +
- " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
- s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
-
- s.execute("CREATE TABLE NEXT_LOCK_ID (" +
- " NL_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
-
- s.execute("CREATE TABLE COMPACTION_QUEUE (" +
- " CQ_ID bigint PRIMARY KEY," +
- " CQ_DATABASE varchar(128) NOT NULL," +
- " CQ_TABLE varchar(128) NOT NULL," +
- " CQ_PARTITION varchar(767)," +
- " CQ_STATE char(1) NOT NULL," +
- " CQ_TYPE char(1) NOT NULL," +
- " CQ_WORKER_ID varchar(128)," +
- " CQ_START bigint," +
- " CQ_RUN_AS varchar(128))");
+ stmt = conn.createStatement();
+ stmt.execute("CREATE TABLE TXNS (" +
+ " TXN_ID bigint PRIMARY KEY," +
+ " TXN_STATE char(1) NOT NULL," +
+ " TXN_STARTED bigint NOT NULL," +
+ " TXN_LAST_HEARTBEAT bigint NOT NULL," +
+ " TXN_USER varchar(128) NOT NULL," +
+ " TXN_HOST varchar(128) NOT NULL)");
+
+ stmt.execute("CREATE TABLE TXN_COMPONENTS (" +
+ " TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+ " TC_DATABASE varchar(128) NOT NULL," +
+ " TC_TABLE varchar(128)," +
+ " TC_PARTITION varchar(767))");
+ stmt.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+ " CTC_TXNID bigint," +
+ " CTC_DATABASE varchar(128) NOT NULL," +
+ " CTC_TABLE varchar(128)," +
+ " CTC_PARTITION varchar(767))");
+ stmt.execute("CREATE TABLE NEXT_TXN_ID (" + " NTXN_NEXT bigint NOT NULL)");
+ stmt.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+ stmt.execute("CREATE TABLE HIVE_LOCKS (" +
+ " HL_LOCK_EXT_ID bigint NOT NULL," +
+ " HL_LOCK_INT_ID bigint NOT NULL," +
+ " HL_TXNID bigint," +
+ " HL_DB varchar(128) NOT NULL," +
+ " HL_TABLE varchar(128)," +
+ " HL_PARTITION varchar(767)," +
+ " HL_LOCK_STATE char(1) NOT NULL," +
+ " HL_LOCK_TYPE char(1) NOT NULL," +
+ " HL_LAST_HEARTBEAT bigint NOT NULL," +
+ " HL_ACQUIRED_AT bigint," +
+ " HL_USER varchar(128) NOT NULL," +
+ " HL_HOST varchar(128) NOT NULL," +
+ " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+ stmt.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+
+ stmt.execute("CREATE TABLE NEXT_LOCK_ID (" + " NL_NEXT bigint NOT NULL)");
+ stmt.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+
+ stmt.execute("CREATE TABLE COMPACTION_QUEUE (" +
+ " CQ_ID bigint PRIMARY KEY," +
+ " CQ_DATABASE varchar(128) NOT NULL," +
+ " CQ_TABLE varchar(128) NOT NULL," +
+ " CQ_PARTITION varchar(767)," +
+ " CQ_STATE char(1) NOT NULL," +
+ " CQ_TYPE char(1) NOT NULL," +
+ " CQ_WORKER_ID varchar(128)," +
+ " CQ_START bigint," +
+ " CQ_RUN_AS varchar(128))");
- s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
- s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+ stmt.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
+ stmt.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
conn.commit();
- committed = true;
} finally {
- if (!committed) conn.rollback();
- conn.close();
+ closeResources(conn, stmt, null);
}
}
- public static void cleanDb() throws Exception {
+ public static void cleanDb() throws Exception {
Connection conn = null;
- boolean committed = false;
+ Statement stmt = null;
try {
conn = getConnection();
- Statement s = conn.createStatement();
+ stmt = conn.createStatement();
+
// We want to try these, whether they succeed or fail.
try {
- s.execute("DROP INDEX HL_TXNID_INDEX");
- } catch (Exception e) {
- System.err.println("Unable to drop index HL_TXNID_INDEX " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE TXN_COMPONENTS");
- } catch (Exception e) {
- System.err.println("Unable to drop table TXN_COMPONENTS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
- } catch (Exception e) {
- System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE TXNS");
- } catch (Exception e) {
- System.err.println("Unable to drop table TXNS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE NEXT_TXN_ID");
- } catch (Exception e) {
- System.err.println("Unable to drop table NEXT_TXN_ID " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE HIVE_LOCKS");
- } catch (Exception e) {
- System.err.println("Unable to drop table HIVE_LOCKS " +
- e.getMessage());
- }
- try {
- s.execute("DROP TABLE NEXT_LOCK_ID");
- } catch (Exception e) {
- }
- try {
- s.execute("DROP TABLE COMPACTION_QUEUE");
- } catch (Exception e) {
- }
- try {
- s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
+ stmt.execute("DROP INDEX HL_TXNID_INDEX");
} catch (Exception e) {
+ System.err.println("Unable to drop index HL_TXNID_INDEX " + e.getMessage());
}
+
+ dropTable(stmt, "TXN_COMPONENTS");
+ dropTable(stmt, "COMPLETED_TXN_COMPONENTS");
+ dropTable(stmt, "TXNS");
+ dropTable(stmt, "NEXT_TXN_ID");
+ dropTable(stmt, "HIVE_LOCKS");
+ dropTable(stmt, "NEXT_LOCK_ID");
+ dropTable(stmt, "COMPACTION_QUEUE");
+ dropTable(stmt, "NEXT_COMPACTION_QUEUE_ID");
+
conn.commit();
- committed = true;
} finally {
- if (!committed) conn.rollback();
- conn.close();
+ closeResources(conn, stmt, null);
+ }
+ }
+
+ private static void dropTable(Statement stmt, String name) {
+ try {
+ stmt.execute("DROP TABLE " + name);
+ } catch (Exception e) {
+ System.err.println("Unable to drop table " + name + ": " + e.getMessage());
}
}
/**
* A tool to count the number of partitions, tables,
* and databases locked by a particular lockId.
+ *
* @param lockId lock id to look for lock components
+ *
* @return number of components, or 0 if there is no lock
*/
- public static int countLockComponents(long lockId) throws Exception {
- Connection conn = getConnection();
+ public static int countLockComponents(long lockId) throws Exception {
+ Connection conn = null;
+ PreparedStatement stmt = null;
+ ResultSet rs = null;
try {
- Statement s = conn.createStatement();
- ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id = " +
- lockId);
- if (!rs.next()) return 0;
- int rc = rs.getInt(1);
- return rc;
+ conn = getConnection();
+ stmt = conn.prepareStatement("SELECT count(*) FROM hive_locks WHERE hl_lock_ext_id = ?");
+ stmt.setLong(1, lockId);
+ rs = stmt.executeQuery();
+ if (!rs.next()) {
+ return 0;
+ }
+ return rs.getInt(1);
} finally {
- conn.rollback();
- conn.close();
+ closeResources(conn, stmt, rs);
}
}
public static int findNumCurrentLocks() throws Exception {
Connection conn = null;
+ Statement stmt = null;
+ ResultSet rs = null;
try {
conn = getConnection();
- Statement s = conn.createStatement();
- ResultSet rs = s.executeQuery("select count(*) from hive_locks");
- if (!rs.next()) return 0;
- int rc = rs.getInt(1);
- return rc;
- } finally {
- if (conn != null) {
- conn.rollback();
- conn.close();
+ stmt = conn.createStatement();
+ rs = stmt.executeQuery("select count(*) from hive_locks");
+ if (!rs.next()) {
+ return 0;
}
+ return rs.getInt(1);
+ } finally {
+ closeResources(conn, stmt, rs);
}
}
private static Connection getConnection() throws Exception {
HiveConf conf = new HiveConf();
String jdbcDriver = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_DRIVER);
- Driver driver = (Driver)Class.forName(jdbcDriver).newInstance();
+ Driver driver = (Driver) Class.forName(jdbcDriver).newInstance();
Properties prop = new Properties();
String driverUrl = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORECONNECTURLKEY);
String user = HiveConf.getVar(conf, HiveConf.ConfVars.METASTORE_CONNECTION_USER_NAME);
- String passwd = ShimLoader.getHadoopShims().getPassword(conf,
- HiveConf.ConfVars.METASTOREPWD.varname);
- prop.put("user", user);
- prop.put("password", passwd);
+ String passwd =
+ ShimLoader.getHadoopShims().getPassword(conf, HiveConf.ConfVars.METASTOREPWD.varname);
+ prop.setProperty("user", user);
+ prop.setProperty("password", passwd);
return driver.connect(driverUrl, prop);
}
+ private static void closeResources(Connection conn, Statement stmt, ResultSet rs) {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ System.err.println("Error closing ResultSet: " + e.getMessage());
+ }
+ }
+
+ if (stmt != null) {
+ try {
+ stmt.close();
+ } catch (SQLException e) {
+ System.err.println("Error closing Statement: " + e.getMessage());
+ }
+ }
+
+ if (conn != null) {
+ try {
+ conn.rollback();
+ } catch (SQLException e) {
+ System.err.println("Error rolling back: " + e.getMessage());
+ }
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ System.err.println("Error closing Connection: " + e.getMessage());
+ }
+ }
+ }
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/QueryProperties.java Wed Sep 10 21:41:16 2014
@@ -185,7 +185,6 @@ public class QueryProperties {
return this.filterWithSubQuery;
}
-
public void clear() {
hasJoin = false;
hasGroupBy = false;
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java Wed Sep 10 21:41:16 2014
@@ -1351,9 +1351,9 @@ public class DDLTask extends Task<DDLWor
if(harPartitionDir.getUserInfo() != null) {
authority.append(harPartitionDir.getUserInfo()).append("@");
}
- authority.append(harPartitionDir.getHost()).append(":");
+ authority.append(harPartitionDir.getHost());
if(harPartitionDir.getPort() != -1) {
- authority.append(harPartitionDir.getPort());
+ authority.append(":").append(harPartitionDir.getPort());
}
Path harPath = new Path(harPartitionDir.getScheme(),
authority.toString(),
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/FunctionRegistry.java Wed Sep 10 21:41:16 2014
@@ -557,7 +557,7 @@ public final class FunctionRegistry {
try {
FunctionTask.addFunctionResources(func.getResourceUris());
} catch (Exception e) {
- LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e);
+ LOG.error("Unable to load resources for " + dbName + "." + fName + ":" + e.getMessage(), e);
return null;
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Wed Sep 10 21:41:16 2014
@@ -92,6 +92,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hive.common.HiveInterruptCallback;
import org.apache.hadoop.hive.common.HiveInterruptUtils;
import org.apache.hadoop.hive.common.HiveStatsUtils;
@@ -160,6 +161,7 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
import org.apache.hadoop.hive.shims.ShimLoader;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
@@ -3423,6 +3425,41 @@ public final class Utilities {
}
}
+ public static boolean createDirsWithPermission(Configuration conf, Path mkdirPath,
+ FsPermission fsPermission, boolean recursive) throws IOException {
+ String origUmask = null;
+ LOG.debug("Create dirs " + mkdirPath + " with permission " + fsPermission + " recursive "
+ + recursive);
+ if (recursive) {
+ origUmask = conf.get(FsPermission.UMASK_LABEL);
+ // this umask is required because by default the hdfs mask is 022 resulting in
+ // all parents getting the fsPermission & !(022) permission instead of fsPermission
+ conf.set(FsPermission.UMASK_LABEL, "000");
+ }
+ FileSystem fs = ShimLoader.getHadoopShims().getNonCachedFileSystem(mkdirPath.toUri(), conf);
+ boolean retval = false;
+ try {
+ retval = fs.mkdirs(mkdirPath, fsPermission);
+ resetUmaskInConf(conf, recursive, origUmask);
+ } catch (IOException ioe) {
+ resetUmaskInConf(conf, recursive, origUmask);
+ throw ioe;
+ } finally {
+ IOUtils.closeStream(fs);
+ }
+ return retval;
+ }
+
+ private static void resetUmaskInConf(Configuration conf, boolean unsetUmask, String origUmask) {
+ if (unsetUmask) {
+ if (origUmask != null) {
+ conf.set(FsPermission.UMASK_LABEL, origUmask);
+ } else {
+ conf.unset(FsPermission.UMASK_LABEL);
+ }
+ }
+ }
+
/**
* Returns true if a plan is both configured for vectorized execution
* and vectorization is allowed. The plan may be configured for vectorization
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/AggregateDefinition.java Wed Sep 10 21:41:16 2014
@@ -19,16 +19,20 @@
package org.apache.hadoop.hive.ql.exec.vector;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
class AggregateDefinition {
+
private String name;
private VectorExpressionDescriptor.ArgumentType type;
+ private GroupByDesc.Mode mode;
private Class<? extends VectorAggregateExpression> aggClass;
AggregateDefinition(String name, VectorExpressionDescriptor.ArgumentType type,
- Class<? extends VectorAggregateExpression> aggClass) {
+ GroupByDesc.Mode mode, Class<? extends VectorAggregateExpression> aggClass) {
this.name = name;
this.type = type;
+ this.mode = mode;
this.aggClass = aggClass;
}
@@ -38,6 +42,9 @@ class AggregateDefinition {
VectorExpressionDescriptor.ArgumentType getType() {
return type;
}
+ GroupByDesc.Mode getMode() {
+ return mode;
+ }
Class<? extends VectorAggregateExpression> getAggClass() {
return aggClass;
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Wed Sep 10 21:41:16 2014
@@ -32,8 +32,11 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator;
+import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluatorFactory;
import org.apache.hadoop.hive.ql.exec.GroupByOperator;
import org.apache.hadoop.hive.ql.exec.KeyWrapper;
+import org.apache.hadoop.hive.ql.exec.KeyWrapperFactory;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
@@ -47,13 +50,17 @@ import org.apache.hadoop.hive.ql.plan.ap
import org.apache.hadoop.hive.ql.util.JavaDataModel;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.DataOutputBuffer;
/**
* Vectorized GROUP BY operator implementation. Consumes the vectorized input and
* stores the aggregate operators' intermediate states. Emits row mode output.
*
*/
-public class VectorGroupByOperator extends GroupByOperator {
+public class VectorGroupByOperator extends GroupByOperator implements VectorizationContextRegion {
private static final Log LOG = LogFactory.getLog(
VectorGroupByOperator.class.getName());
@@ -70,6 +77,17 @@ public class VectorGroupByOperator exten
*/
private VectorExpression[] keyExpressions;
+ private boolean isVectorOutput;
+
+ // Create a new outgoing vectorization context because column name map will change.
+ private VectorizationContext vOutContext = null;
+
+ private String fileKey;
+
+ // The above members are initialized by the constructor and must not be
+ // transient.
+ //---------------------------------------------------------------------------
+
private transient VectorExpressionWriter[] keyOutputWriters;
/**
@@ -85,11 +103,18 @@ public class VectorGroupByOperator exten
private transient Object[] forwardCache;
+ private transient VectorizedRowBatch outputBatch;
+ private transient VectorizedRowBatchCtx vrbCtx;
+
+ private transient VectorColumnAssign[] vectorColumnAssign;
+
/**
- * Interface for processing mode: global, hash or streaming
+ * Interface for processing mode: global, hash, unsorted streaming, or group batch
*/
private static interface IProcessingMode {
public void initialize(Configuration hconf) throws HiveException;
+ public void startGroup() throws HiveException;
+ public void endGroup() throws HiveException;
public void processBatch(VectorizedRowBatch batch) throws HiveException;
public void close(boolean aborted) throws HiveException;
}
@@ -98,6 +123,15 @@ public class VectorGroupByOperator exten
* Base class for all processing modes
*/
private abstract class ProcessingModeBase implements IProcessingMode {
+
+ // Overridden and used in sorted reduce group batch processing mode.
+ public void startGroup() throws HiveException {
+ // Do nothing.
+ }
+ public void endGroup() throws HiveException {
+ // Do nothing.
+ }
+
/**
* Evaluates the aggregators on the current batch.
* The aggregationBatchInfo must have been prepared
@@ -170,7 +204,7 @@ public class VectorGroupByOperator exten
@Override
public void close(boolean aborted) throws HiveException {
if (!aborted) {
- flushSingleRow(null, aggregationBuffers);
+ writeSingleRow(null, aggregationBuffers);
}
}
}
@@ -426,7 +460,7 @@ public class VectorGroupByOperator exten
while(iter.hasNext()) {
Map.Entry<KeyWrapper, VectorAggregationBufferRow> pair = iter.next();
- flushSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
+ writeSingleRow((VectorHashKeyWrapper) pair.getKey(), pair.getValue());
if (!all) {
iter.remove();
@@ -501,20 +535,21 @@ public class VectorGroupByOperator exten
if (numEntriesHashTable > sumBatchSize * minReductionHashAggr) {
flush(true);
- changeToStreamingMode();
+ changeToUnsortedStreamingMode();
}
}
}
}
/**
- * Streaming processing mode. Intermediate values are flushed each time key changes.
- * In this mode we're relying on the MR shuffle and merge the intermediates in the reduce.
+ * Unsorted streaming processing mode. Each input VectorizedRowBatch may have
+ * a mix of different keys (hence unsorted). Intermediate values are flushed
+ * each time key changes.
*/
- private class ProcessingModeStreaming extends ProcessingModeBase {
+ private class ProcessingModeUnsortedStreaming extends ProcessingModeBase {
/**
- * The aggreagation buffers used in streaming mode
+ * The aggregation buffers used in streaming mode
*/
private VectorAggregationBufferRow currentStreamingAggregators;
@@ -557,7 +592,7 @@ public class VectorGroupByOperator exten
// Nothing to do
}
});
- LOG.info("using streaming aggregation processing mode");
+ LOG.info("using unsorted streaming aggregation processing mode");
}
@Override
@@ -601,7 +636,7 @@ public class VectorGroupByOperator exten
// Now flush/forward all keys/rows, except the last (current) one
for (int i = 0; i < flushMark; ++i) {
- flushSingleRow(keysToFlush[i], rowsToFlush[i]);
+ writeSingleRow(keysToFlush[i], rowsToFlush[i]);
rowsToFlush[i].reset();
streamAggregationBufferRowPool.putInPool(rowsToFlush[i]);
}
@@ -610,7 +645,79 @@ public class VectorGroupByOperator exten
@Override
public void close(boolean aborted) throws HiveException {
if (!aborted && null != streamingKey) {
- flushSingleRow(streamingKey, currentStreamingAggregators);
+ writeSingleRow(streamingKey, currentStreamingAggregators);
+ }
+ }
+ }
+
+ /**
+ * Sorted reduce group batch processing mode. Each input VectorizedRowBatch will have the
+ * same key. On endGroup (or close), the intermediate values are flushed.
+ */
+ private class ProcessingModeGroupBatches extends ProcessingModeBase {
+
+ private boolean inGroup;
+ private boolean first;
+
+ /**
+ * The group vector key helper.
+ */
+ VectorGroupKeyHelper groupKeyHelper;
+
+ /**
+ * The group vector aggregation buffers.
+ */
+ private VectorAggregationBufferRow groupAggregators;
+
+ /**
+ * Buffer to hold string values.
+ */
+ private DataOutputBuffer buffer;
+
+ @Override
+ public void initialize(Configuration hconf) throws HiveException {
+ inGroup = false;
+ groupKeyHelper = new VectorGroupKeyHelper(keyExpressions.length);
+ groupKeyHelper.init(keyExpressions);
+ groupAggregators = allocateAggregationBuffer();
+ buffer = new DataOutputBuffer();
+ LOG.info("using sorted group batch aggregation processing mode");
+ }
+
+ @Override
+ public void startGroup() throws HiveException {
+ inGroup = true;
+ first = true;
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ if (inGroup && !first) {
+ writeGroupRow(groupAggregators, buffer);
+ groupAggregators.reset();
+ }
+ inGroup = false;
+ }
+
+ @Override
+ public void processBatch(VectorizedRowBatch batch) throws HiveException {
+ assert(inGroup);
+ if (first) {
+ // Copy the group key to output batch now. We'll copy in the aggregates at the end of the group.
+ first = false;
+ groupKeyHelper.copyGroupKey(batch, outputBatch, buffer);
+ }
+
+ // Aggregate this batch.
+ for (int i = 0; i < aggregators.length; ++i) {
+ aggregators[i].aggregateInput(groupAggregators.getAggregationBuffer(i), batch);
+ }
+ }
+
+ @Override
+ public void close(boolean aborted) throws HiveException {
+ if (!aborted && inGroup && !first) {
+ writeGroupRow(groupAggregators, buffer);
}
}
}
@@ -633,8 +740,20 @@ public class VectorGroupByOperator exten
aggregators = new VectorAggregateExpression[aggrDesc.size()];
for (int i = 0; i < aggrDesc.size(); ++i) {
AggregationDesc aggDesc = aggrDesc.get(i);
- aggregators[i] = vContext.getAggregatorExpression(aggDesc);
+ aggregators[i] = vContext.getAggregatorExpression(aggDesc, desc.getVectorDesc().isReduce());
}
+
+ isVectorOutput = desc.getVectorDesc().isVectorOutput();
+
+ List<String> outColNames = desc.getOutputColumnNames();
+ Map<String, Integer> mapOutCols = new HashMap<String, Integer>(outColNames.size());
+ int outColIndex = 0;
+ for(String outCol: outColNames) {
+ mapOutCols.put(outCol, outColIndex++);
+ }
+ vOutContext = new VectorizationContext(mapOutCols, outColIndex);
+ vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_");
+ fileKey = vOutContext.getFileKey();
}
public VectorGroupByOperator() {
@@ -662,13 +781,23 @@ public class VectorGroupByOperator exten
objectInspectors.add(aggregators[i].getOutputObjectInspector());
}
- keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
- aggregationBatchInfo = new VectorAggregationBufferBatch();
- aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
-
+ if (!conf.getVectorDesc().isVectorGroupBatches()) {
+ // These data structures are only used by the map-side processing modes.
+ keyWrappersBatch = VectorHashKeyWrapperBatch.compileKeyWrapperBatch(keyExpressions);
+ aggregationBatchInfo = new VectorAggregationBufferBatch();
+ aggregationBatchInfo.compileAggregationBatchInfo(aggregators);
+ }
+ LOG.warn("VectorGroupByOperator is vector output " + isVectorOutput);
List<String> outputFieldNames = conf.getOutputColumnNames();
outputObjInspector = ObjectInspectorFactory.getStandardStructObjectInspector(
outputFieldNames, objectInspectors);
+ if (isVectorOutput) {
+ vrbCtx = new VectorizedRowBatchCtx();
+ vrbCtx.init(hconf, fileKey, (StructObjectInspector) outputObjInspector);
+ outputBatch = vrbCtx.createVectorizedRowBatch();
+ vectorColumnAssign = VectorColumnAssignFactory.buildAssigners(
+ outputBatch, outputObjInspector, vOutContext.getColumnMap(), conf.getOutputColumnNames());
+ }
} catch (HiveException he) {
throw he;
@@ -678,32 +807,43 @@ public class VectorGroupByOperator exten
initializeChildren(hconf);
- forwardCache =new Object[keyExpressions.length + aggregators.length];
+ forwardCache = new Object[keyExpressions.length + aggregators.length];
if (keyExpressions.length == 0) {
- processingMode = this.new ProcessingModeGlobalAggregate();
- }
- else {
- //TODO: consider if parent can offer order guarantees
- // If input is sorted, is more efficient to use the streaming mode
+ processingMode = this.new ProcessingModeGlobalAggregate();
+ } else if (conf.getVectorDesc().isVectorGroupBatches()) {
+ // Sorted GroupBy of vector batches where an individual batch has the same group key (e.g. reduce).
+ processingMode = this.new ProcessingModeGroupBatches();
+ } else {
+ // We start in hash mode and may dynamically switch to unsorted stream mode.
processingMode = this.new ProcessingModeHashAggregate();
}
processingMode.initialize(hconf);
}
/**
- * changes the processing mode to streaming
+ * changes the processing mode to unsorted streaming
* This is done at the request of the hash agg mode, if the number of keys
* exceeds the minReductionHashAggr factor
* @throws HiveException
*/
- private void changeToStreamingMode() throws HiveException {
- processingMode = this.new ProcessingModeStreaming();
+ private void changeToUnsortedStreamingMode() throws HiveException {
+ processingMode = this.new ProcessingModeUnsortedStreaming();
processingMode.initialize(null);
LOG.trace("switched to streaming mode");
}
@Override
+ public void startGroup() throws HiveException {
+ processingMode.startGroup();
+ }
+
+ @Override
+ public void endGroup() throws HiveException {
+ processingMode.endGroup();
+ }
+
+ @Override
public void processOp(Object row, int tag) throws HiveException {
VectorizedRowBatch batch = (VectorizedRowBatch) row;
@@ -719,26 +859,72 @@ public class VectorGroupByOperator exten
* @param agg
* @throws HiveException
*/
- private void flushSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
+ private void writeSingleRow(VectorHashKeyWrapper kw, VectorAggregationBufferRow agg)
throws HiveException {
int fi = 0;
- for (int i = 0; i < keyExpressions.length; ++i) {
- forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
- kw, i, keyOutputWriters[i]);
+ if (!isVectorOutput) {
+ // Output row.
+ for (int i = 0; i < keyExpressions.length; ++i) {
+ forwardCache[fi++] = keyWrappersBatch.getWritableKeyValue (
+ kw, i, keyOutputWriters[i]);
+ }
+ for (int i = 0; i < aggregators.length; ++i) {
+ forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("forwarding keys: %s: %s",
+ kw, Arrays.toString(forwardCache)));
+ }
+ forward(forwardCache, outputObjInspector);
+ } else {
+ // Output keys and aggregates into the output batch.
+ for (int i = 0; i < keyExpressions.length; ++i) {
+ vectorColumnAssign[fi++].assignObjectValue(keyWrappersBatch.getWritableKeyValue (
+ kw, i, keyOutputWriters[i]), outputBatch.size);
+ }
+ for (int i = 0; i < aggregators.length; ++i) {
+ vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+ agg.getAggregationBuffer(i)), outputBatch.size);
+ }
+ ++outputBatch.size;
+ if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+ flushOutput();
+ }
}
+ }
+
+ /**
+ * Emits a (reduce) group row, made from the key (copied in at the beginning of the group) and
+ * the row aggregation buffers values
+ * @param agg
+ * @param buffer
+ * @throws HiveException
+ */
+ private void writeGroupRow(VectorAggregationBufferRow agg, DataOutputBuffer buffer)
+ throws HiveException {
+ int fi = keyExpressions.length; // Start after group keys.
for (int i = 0; i < aggregators.length; ++i) {
- forwardCache[fi++] = aggregators[i].evaluateOutput(agg.getAggregationBuffer(i));
+ vectorColumnAssign[fi++].assignObjectValue(aggregators[i].evaluateOutput(
+ agg.getAggregationBuffer(i)), outputBatch.size);
}
- if (LOG.isDebugEnabled()) {
- LOG.debug(String.format("forwarding keys: %s: %s",
- kw, Arrays.toString(forwardCache)));
+ ++outputBatch.size;
+ if (outputBatch.size == VectorizedRowBatch.DEFAULT_SIZE) {
+ flushOutput();
+ buffer.reset();
}
- forward(forwardCache, outputObjInspector);
+ }
+
+ private void flushOutput() throws HiveException {
+ forward(outputBatch, null);
+ outputBatch.reset();
}
@Override
public void closeOp(boolean aborted) throws HiveException {
processingMode.close(aborted);
+ if (!aborted && isVectorOutput && outputBatch.size > 0) {
+ flushOutput();
+ }
}
static public String getOperatorName() {
@@ -761,4 +947,8 @@ public class VectorGroupByOperator exten
this.aggregators = aggregators;
}
+ @Override
+ public VectorizationContext getOuputVectorizationContext() {
+ return vOutContext;
+ }
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapper.java Wed Sep 10 21:41:16 2014
@@ -60,6 +60,7 @@ public class VectorHashKeyWrapper extend
byteStarts = new int[byteValuesCount];
byteLengths = new int[byteValuesCount];
isNull = new boolean[longValuesCount + doubleValuesCount + byteValuesCount + decimalValuesCount];
+ hashcode = 0;
}
private VectorHashKeyWrapper() {
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorHashKeyWrapperBatch.java Wed Sep 10 21:41:16 2014
@@ -32,41 +32,10 @@ import org.apache.hadoop.hive.serde2.laz
* This class stores additional information about keys needed to evaluate and output the key values.
*
*/
-public class VectorHashKeyWrapperBatch {
+public class VectorHashKeyWrapperBatch extends VectorColumnSetInfo {
- /**
- * Helper class for looking up a key value based on key index.
- */
- private static class KeyLookupHelper {
- private int longIndex;
- private int doubleIndex;
- private int stringIndex;
- private int decimalIndex;
-
- private static final int INDEX_UNUSED = -1;
-
- private void resetIndices() {
- this.longIndex = this.doubleIndex = this.stringIndex = this.decimalIndex = INDEX_UNUSED;
- }
- public void setLong(int index) {
- resetIndices();
- this.longIndex= index;
- }
-
- public void setDouble(int index) {
- resetIndices();
- this.doubleIndex = index;
- }
-
- public void setString(int index) {
- resetIndices();
- this.stringIndex = index;
- }
-
- public void setDecimal(int index) {
- resetIndices();
- this.decimalIndex = index;
- }
+ public VectorHashKeyWrapperBatch(int keyCount) {
+ super(keyCount);
}
/**
@@ -80,26 +49,6 @@ public class VectorHashKeyWrapperBatch {
private VectorExpression[] keyExpressions;
/**
- * indices of LONG primitive keys.
- */
- private int[] longIndices;
-
- /**
- * indices of DOUBLE primitive keys.
- */
- private int[] doubleIndices;
-
- /**
- * indices of string (byte[]) primitive keys.
- */
- private int[] stringIndices;
-
- /**
- * indices of decimal primitive keys.
- */
- private int[] decimalIndices;
-
- /**
* Pre-allocated batch size vector of keys wrappers.
* N.B. these keys are **mutable** and should never be used in a HashMap.
* Always clone the key wrapper to obtain an immutable keywrapper suitable
@@ -108,11 +57,6 @@ public class VectorHashKeyWrapperBatch {
private VectorHashKeyWrapper[] vectorHashKeyWrappers;
/**
- * Lookup vector to map from key index to primitive type index.
- */
- private KeyLookupHelper[] indexLookup;
-
- /**
* The fixed size of the key wrappers.
*/
private int keysFixedSize;
@@ -567,53 +511,17 @@ public class VectorHashKeyWrapperBatch {
*/
public static VectorHashKeyWrapperBatch compileKeyWrapperBatch(VectorExpression[] keyExpressions)
throws HiveException {
- VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch();
+ VectorHashKeyWrapperBatch compiledKeyWrapperBatch = new VectorHashKeyWrapperBatch(keyExpressions.length);
compiledKeyWrapperBatch.keyExpressions = keyExpressions;
compiledKeyWrapperBatch.keysFixedSize = 0;
- // We'll overallocate and then shrink the array for each type
- int[] longIndices = new int[keyExpressions.length];
- int longIndicesIndex = 0;
- int[] doubleIndices = new int[keyExpressions.length];
- int doubleIndicesIndex = 0;
- int[] stringIndices = new int[keyExpressions.length];
- int stringIndicesIndex = 0;
- int[] decimalIndices = new int[keyExpressions.length];
- int decimalIndicesIndex = 0;
- KeyLookupHelper[] indexLookup = new KeyLookupHelper[keyExpressions.length];
-
// Inspect the output type of each key expression.
for(int i=0; i < keyExpressions.length; ++i) {
- indexLookup[i] = new KeyLookupHelper();
- String outputType = keyExpressions[i].getOutputType();
- if (VectorizationContext.isIntFamily(outputType) ||
- VectorizationContext.isDatetimeFamily(outputType)) {
- longIndices[longIndicesIndex] = i;
- indexLookup[i].setLong(longIndicesIndex);
- ++longIndicesIndex;
- } else if (VectorizationContext.isFloatFamily(outputType)) {
- doubleIndices[doubleIndicesIndex] = i;
- indexLookup[i].setDouble(doubleIndicesIndex);
- ++doubleIndicesIndex;
- } else if (VectorizationContext.isStringFamily(outputType)) {
- stringIndices[stringIndicesIndex]= i;
- indexLookup[i].setString(stringIndicesIndex);
- ++stringIndicesIndex;
- } else if (VectorizationContext.isDecimalFamily(outputType)) {
- decimalIndices[decimalIndicesIndex]= i;
- indexLookup[i].setDecimal(decimalIndicesIndex);
- ++decimalIndicesIndex;
- }
- else {
- throw new HiveException("Unsuported vector output type: " + outputType);
- }
- }
- compiledKeyWrapperBatch.indexLookup = indexLookup;
- compiledKeyWrapperBatch.longIndices = Arrays.copyOf(longIndices, longIndicesIndex);
- compiledKeyWrapperBatch.doubleIndices = Arrays.copyOf(doubleIndices, doubleIndicesIndex);
- compiledKeyWrapperBatch.stringIndices = Arrays.copyOf(stringIndices, stringIndicesIndex);
- compiledKeyWrapperBatch.decimalIndices = Arrays.copyOf(decimalIndices, decimalIndicesIndex);
+ compiledKeyWrapperBatch.addKey(keyExpressions[i].getOutputType());
+ }
+ compiledKeyWrapperBatch.finishAdding();
+
compiledKeyWrapperBatch.vectorHashKeyWrappers =
new VectorHashKeyWrapper[VectorizedRowBatch.DEFAULT_SIZE];
for(int i=0;i<VectorizedRowBatch.DEFAULT_SIZE; ++i) {
@@ -632,11 +540,11 @@ public class VectorHashKeyWrapperBatch {
model.memoryAlign());
// Now add the key wrapper arrays
- compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(longIndicesIndex);
- compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(doubleIndicesIndex);
- compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(stringIndicesIndex);
- compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(decimalIndicesIndex);
- compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(longIndicesIndex) * 2;
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForLongArrayOfSize(compiledKeyWrapperBatch.longIndices.length);
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForDoubleArrayOfSize(compiledKeyWrapperBatch.doubleIndices.length);
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.stringIndices.length);
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForObjectArrayOfSize(compiledKeyWrapperBatch.decimalIndices.length);
+ compiledKeyWrapperBatch.keysFixedSize += model.lengthForIntArrayOfSize(compiledKeyWrapperBatch.longIndices.length) * 2;
compiledKeyWrapperBatch.keysFixedSize +=
model.lengthForBooleanArrayOfSize(keyExpressions.length);
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Wed Sep 10 21:41:16 2014
@@ -44,6 +44,7 @@ import org.apache.hadoop.hive.ql.exec.UD
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.InputExpressionType;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor.Mode;
import org.apache.hadoop.hive.ql.exec.vector.expressions.*;
+import org.apache.hadoop.hive.ql.exec.vector.AggregateDefinition;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFAvgDecimal;
import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorUDAFCount;
@@ -83,6 +84,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeNullDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.udf.SettableUDF;
import org.apache.hadoop.hive.ql.udf.UDFConv;
import org.apache.hadoop.hive.ql.udf.UDFHex;
@@ -198,7 +200,8 @@ public class VectorizationContext {
protected int getInputColumnIndex(String name) {
if (!columnMap.containsKey(name)) {
- LOG.error(String.format("The column %s is not in the vectorization context column map.", name));
+ LOG.error(String.format("The column %s is not in the vectorization context column map %s.",
+ name, columnMap.toString()));
}
return columnMap.get(name);
}
@@ -1880,50 +1883,55 @@ public class VectorizationContext {
}
}
+ // TODO: When we support vectorized STRUCTs and can handle more in the reduce-side (MERGEPARTIAL):
+ // TODO: Write reduce-side versions of AVG. Currently, only map-side (HASH) versions are in table.
+ // TODO: And, investigate if different reduce-side versions are needed for var* and std*, or if map-side aggregate can be used.. Right now they are conservatively
+ // marked map-side (HASH).
static ArrayList<AggregateDefinition> aggregatesDefinition = new ArrayList<AggregateDefinition>() {{
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMinLong.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMinDouble.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMinString.class));
- add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMinDecimal.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFMaxLong.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFMaxDouble.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFMaxString.class));
- add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFMaxDecimal.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, VectorUDAFCountStar.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, VectorUDAFCount.class));
- add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFCount.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFSumLong.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFSumDouble.class));
- add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFSumDecimal.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFAvgLong.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFAvgDouble.class));
- add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFAvgDecimal.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarPopLong.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarPopDouble.class));
- add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarPopDecimal.class));
- add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFVarSampLong.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFVarSampDouble.class));
- add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFVarSampDecimal.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdPopLong.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdPopDouble.class));
- add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdPopDecimal.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, VectorUDAFStdSampLong.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, VectorUDAFStdSampDouble.class));
- add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, VectorUDAFStdSampDecimal.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMinLong.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMinDouble.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMinString.class));
+ add(new AggregateDefinition("min", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMinDecimal.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFMaxLong.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFMaxDouble.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, null, VectorUDAFMaxString.class));
+ add(new AggregateDefinition("max", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFMaxDecimal.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.NONE, GroupByDesc.Mode.HASH, VectorUDAFCountStar.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.MERGEPARTIAL, VectorUDAFSumLong.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.STRING_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("count", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFCount.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, null, VectorUDAFSumLong.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, null, VectorUDAFSumDouble.class));
+ add(new AggregateDefinition("sum", VectorExpressionDescriptor.ArgumentType.DECIMAL, null, VectorUDAFSumDecimal.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgLong.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFAvgDouble.class));
+ add(new AggregateDefinition("avg", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFAvgDecimal.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopLong.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarPopDouble.class));
+ add(new AggregateDefinition("variance", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("var_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarPopDecimal.class));
+ add(new AggregateDefinition("var_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampLong.class));
+ add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFVarSampDouble.class));
+ add(new AggregateDefinition("var_samp" , VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFVarSampDecimal.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopLong.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdPopDouble.class));
+ add(new AggregateDefinition("std", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev_pop", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdPopDecimal.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.INT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampLong.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.FLOAT_FAMILY, GroupByDesc.Mode.HASH, VectorUDAFStdSampDouble.class));
+ add(new AggregateDefinition("stddev_samp", VectorExpressionDescriptor.ArgumentType.DECIMAL, GroupByDesc.Mode.HASH, VectorUDAFStdSampDecimal.class));
}};
- public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc)
+ public VectorAggregateExpression getAggregatorExpression(AggregationDesc desc, boolean isReduce)
throws HiveException {
ArrayList<ExprNodeDesc> paramDescList = desc.getParameters();
@@ -1948,8 +1956,15 @@ public class VectorizationContext {
for (AggregateDefinition aggDef : aggregatesDefinition) {
if (aggregateName.equalsIgnoreCase(aggDef.getName()) &&
((aggDef.getType() == VectorExpressionDescriptor.ArgumentType.NONE &&
- inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
+ inputType == VectorExpressionDescriptor.ArgumentType.NONE) ||
(aggDef.getType().isSameTypeOrFamily(inputType)))) {
+
+ if (aggDef.getMode() == GroupByDesc.Mode.HASH && isReduce) {
+ continue;
+ } else if (aggDef.getMode() == GroupByDesc.Mode.MERGEPARTIAL && !isReduce) {
+ continue;
+ }
+
Class<? extends VectorAggregateExpression> aggClass = aggDef.getAggClass();
try
{
@@ -1967,7 +1982,7 @@ public class VectorizationContext {
}
throw new HiveException("Vector aggregate not implemented: \"" + aggregateName +
- "\" for type: \"" + inputType.name() + "");
+ "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")");
}
public Map<Integer, String> getOutputColumnTypeMap() {
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Wed Sep 10 21:41:16 2014
@@ -39,6 +39,7 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.IOPrepareCache;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
@@ -124,15 +125,20 @@ public class VectorizedRowBatchCtx {
* Used by non-tablescan operators when they change the vectorization context
* @param hiveConf
* @param fileKey
- * The key on which to retrieve the extra column mapping from the map scratch
+ * The key on which to retrieve the extra column mapping from the map/reduce scratch
* @param rowOI
* Object inspector that shapes the column types
*/
public void init(Configuration hiveConf, String fileKey,
StructObjectInspector rowOI) {
- columnTypeMap = Utilities
- .getMapRedWork(hiveConf).getMapWork().getScratchColumnVectorTypes()
- .get(fileKey);
+ MapredWork mapredWork = Utilities.getMapRedWork(hiveConf);
+ Map<String, Map<Integer, String>> scratchColumnVectorTypes;
+ if (mapredWork.getMapWork() != null) {
+ scratchColumnVectorTypes = mapredWork.getMapWork().getScratchColumnVectorTypes();
+ } else {
+ scratchColumnVectorTypes = mapredWork.getReduceWork().getScratchColumnVectorTypes();
+ }
+ columnTypeMap = scratchColumnVectorTypes.get(fileKey);
this.rowOI= rowOI;
this.rawRowOI = rowOI;
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Wed Sep 10 21:41:16 2014
@@ -2336,6 +2336,12 @@ class RecordReaderImpl implements Record
return ((DecimalColumnStatistics) index).getMaximum();
} else if (index instanceof TimestampColumnStatistics) {
return ((TimestampColumnStatistics) index).getMaximum();
+ } else if (index instanceof BooleanColumnStatistics) {
+ if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
+ return "true";
+ } else {
+ return "false";
+ }
} else {
return null;
}
@@ -2360,6 +2366,12 @@ class RecordReaderImpl implements Record
return ((DecimalColumnStatistics) index).getMinimum();
} else if (index instanceof TimestampColumnStatistics) {
return ((TimestampColumnStatistics) index).getMinimum();
+ } else if (index instanceof BooleanColumnStatistics) {
+ if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
+ return "false";
+ } else {
+ return "true";
+ }
} else {
return null;
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/io/sarg/SearchArgumentImpl.java Wed Sep 10 21:41:16 2014
@@ -331,6 +331,8 @@ final class SearchArgumentImpl implement
return PredicateLeaf.Type.TIMESTAMP;
case DECIMAL:
return PredicateLeaf.Type.DECIMAL;
+ case BOOLEAN:
+ return PredicateLeaf.Type.BOOLEAN;
default:
}
}
@@ -368,6 +370,7 @@ final class SearchArgumentImpl implement
case DATE:
case TIMESTAMP:
case DECIMAL:
+ case BOOLEAN:
return lit;
default:
throw new IllegalArgumentException("Unknown literal " + getType(lit));
@@ -963,7 +966,8 @@ final class SearchArgumentImpl implement
literal instanceof DateWritable ||
literal instanceof Timestamp ||
literal instanceof HiveDecimal ||
- literal instanceof BigDecimal) {
+ literal instanceof BigDecimal ||
+ literal instanceof Boolean) {
return literal;
} else if (literal instanceof HiveChar ||
literal instanceof HiveVarchar) {
@@ -1000,6 +1004,8 @@ final class SearchArgumentImpl implement
}else if (literal instanceof HiveDecimal ||
literal instanceof BigDecimal) {
return PredicateLeaf.Type.DECIMAL;
+ } else if (literal instanceof Boolean) {
+ return PredicateLeaf.Type.BOOLEAN;
}
throw new IllegalArgumentException("Unknown type for literal " + literal);
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/AbstractSMBJoinProc.java Wed Sep 10 21:41:16 2014
@@ -364,8 +364,10 @@ abstract public class AbstractSMBJoinPro
for (int pos = 0; pos < sortCols.size(); pos++) {
Order o = sortCols.get(pos);
- if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
- return false;
+ if (pos < sortColumnsFirstPartition.size()) {
+ if (o.getOrder() != sortColumnsFirstPartition.get(pos).getOrder()) {
+ return false;
+ }
}
sortColNames.add(o.getCol());
}
Modified: hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
URL: http://svn.apache.org/viewvc/hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java?rev=1624140&r1=1624139&r2=1624140&view=diff
==============================================================================
--- hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java (original)
+++ hive/branches/cbo/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java Wed Sep 10 21:41:16 2014
@@ -38,9 +38,11 @@ import org.apache.hadoop.hive.ql.exec.mr
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.exec.vector.VectorExtractOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorExpressionDescriptor;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorizationContextRegion;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.aggregates.VectorAggregateExpression;
import org.apache.hadoop.hive.ql.lib.DefaultGraphWalker;
import org.apache.hadoop.hive.ql.lib.DefaultRuleDispatcher;
import org.apache.hadoop.hive.ql.lib.Dispatcher;
@@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.plan.Ba
import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
+import org.apache.hadoop.hive.ql.plan.GroupByDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
@@ -71,6 +74,7 @@ import org.apache.hadoop.hive.ql.plan.SM
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.TezWork;
+import org.apache.hadoop.hive.ql.plan.VectorGroupByDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.UDFAcos;
import org.apache.hadoop.hive.ql.udf.UDFAsin;
@@ -290,23 +294,26 @@ public class Vectorizer implements Physi
throws SemanticException {
Task<? extends Serializable> currTask = (Task<? extends Serializable>) nd;
if (currTask instanceof MapRedTask) {
- convertMapWork(((MapRedTask) currTask).getWork().getMapWork());
+ convertMapWork(((MapRedTask) currTask).getWork().getMapWork(), false);
} else if (currTask instanceof TezTask) {
TezWork work = ((TezTask) currTask).getWork();
for (BaseWork w: work.getAllWork()) {
if (w instanceof MapWork) {
- convertMapWork((MapWork)w);
+ convertMapWork((MapWork) w, true);
} else if (w instanceof ReduceWork) {
// We are only vectorizing Reduce under Tez.
- convertReduceWork((ReduceWork)w);
+ if (HiveConf.getBoolVar(pctx.getConf(),
+ HiveConf.ConfVars.HIVE_VECTORIZATION_REDUCE_ENABLED)) {
+ convertReduceWork((ReduceWork) w);
+ }
}
}
}
return null;
}
- private void convertMapWork(MapWork mapWork) throws SemanticException {
- boolean ret = validateMapWork(mapWork);
+ private void convertMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+ boolean ret = validateMapWork(mapWork, isTez);
if (ret) {
vectorizeMapWork(mapWork);
}
@@ -319,7 +326,8 @@ public class Vectorizer implements Physi
+ ReduceSinkOperator.getOperatorName()), np);
}
- private boolean validateMapWork(MapWork mapWork) throws SemanticException {
+ private boolean validateMapWork(MapWork mapWork, boolean isTez) throws SemanticException {
+ LOG.info("Validating MapWork...");
// Validate the input format
for (String path : mapWork.getPathToPartitionInfo().keySet()) {
@@ -333,7 +341,7 @@ public class Vectorizer implements Physi
}
}
Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
- MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor();
+ MapWorkValidationNodeProcessor vnp = new MapWorkValidationNodeProcessor(isTez);
addMapWorkRules(opRules, vnp);
Dispatcher disp = new DefaultRuleDispatcher(vnp, opRules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
@@ -417,9 +425,12 @@ public class Vectorizer implements Physi
private void addReduceWorkRules(Map<Rule, NodeProcessor> opRules, NodeProcessor np) {
opRules.put(new RuleRegExp("R1", ExtractOperator.getOperatorName() + ".*"), np);
opRules.put(new RuleRegExp("R2", GroupByOperator.getOperatorName() + ".*"), np);
+ opRules.put(new RuleRegExp("R3", SelectOperator.getOperatorName() + ".*"), np);
}
private boolean validateReduceWork(ReduceWork reduceWork) throws SemanticException {
+ LOG.info("Validating ReduceWork...");
+
// Validate input to ReduceWork.
if (!getOnlyStructObjectInspectors(reduceWork)) {
return false;
@@ -487,16 +498,21 @@ public class Vectorizer implements Physi
class MapWorkValidationNodeProcessor implements NodeProcessor {
+ private boolean isTez;
+
+ public MapWorkValidationNodeProcessor(boolean isTez) {
+ this.isTez = isTez;
+ }
+
@Override
public Object process(Node nd, Stack<Node> stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
for (Node n : stack) {
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
- if ((op.getType().equals(OperatorType.REDUCESINK) || op.getType().equals(OperatorType.FILESINK)) &&
- op.getParentOperators().get(0).getType().equals(OperatorType.GROUPBY)) {
+ if (nonVectorizableChildOfGroupBy(op)) {
return new Boolean(true);
}
- boolean ret = validateMapWorkOperator(op);
+ boolean ret = validateMapWorkOperator(op, isTez);
if (!ret) {
LOG.info("MapWork Operator: " + op.getName() + " could not be vectorized.");
return new Boolean(false);
@@ -513,6 +529,9 @@ public class Vectorizer implements Physi
Object... nodeOutputs) throws SemanticException {
for (Node n : stack) {
Operator<? extends OperatorDesc> op = (Operator<? extends OperatorDesc>) n;
+ if (nonVectorizableChildOfGroupBy(op)) {
+ return new Boolean(true);
+ }
boolean ret = validateReduceWorkOperator(op);
if (!ret) {
LOG.info("ReduceWork Operator: " + op.getName() + " could not be vectorized.");
@@ -579,21 +598,6 @@ public class Vectorizer implements Physi
return vContext;
}
- public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
- Operator<? extends OperatorDesc> currentOp = op;
- while (currentOp.getParentOperators().size() > 0) {
- currentOp = currentOp.getParentOperators().get(0);
- if (currentOp.getType().equals(OperatorType.GROUPBY)) {
- // No need to vectorize
- if (!opsDone.contains(op)) {
- opsDone.add(op);
- }
- return true;
- }
- }
- return false;
- }
-
public Operator<? extends OperatorDesc> doVectorize(Operator<? extends OperatorDesc> op, VectorizationContext vContext)
throws SemanticException {
Operator<? extends OperatorDesc> vectorOp = op;
@@ -665,9 +669,13 @@ public class Vectorizer implements Physi
assert vContext != null;
- // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize
- // any operators below GROUPBY.
+ // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+ // vectorize the operators below it.
if (nonVectorizableChildOfGroupBy(op)) {
+ // No need to vectorize
+ if (!opsDone.contains(op)) {
+ opsDone.add(op);
+ }
return null;
}
@@ -719,13 +727,22 @@ public class Vectorizer implements Physi
assert vContext != null;
- // Currently, Vectorized GROUPBY outputs rows, not vectorized row batchs. So, don't vectorize
- // any operators below GROUPBY.
+ // When Vectorized GROUPBY outputs rows instead of vectorized row batchs, we don't
+ // vectorize the operators below it.
if (nonVectorizableChildOfGroupBy(op)) {
+ // No need to vectorize
+ if (!opsDone.contains(op)) {
+ opsDone.add(op);
+ }
return null;
}
Operator<? extends OperatorDesc> vectorOp = doVectorize(op, vContext);
+ if (vectorOp instanceof VectorGroupByOperator) {
+ VectorGroupByOperator groupBy = (VectorGroupByOperator) vectorOp;
+ VectorGroupByDesc vectorDesc = groupBy.getConf().getVectorDesc();
+ vectorDesc.setVectorGroupBatches(true);
+ }
if (saveRootVectorOp && op != vectorOp) {
rootVectorOp = vectorOp;
}
@@ -772,7 +789,7 @@ public class Vectorizer implements Physi
return pctx;
}
- boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op) {
+ boolean validateMapWorkOperator(Operator<? extends OperatorDesc> op, boolean isTez) {
boolean ret = false;
switch (op.getType()) {
case MAPJOIN:
@@ -783,7 +800,7 @@ public class Vectorizer implements Physi
}
break;
case GROUPBY:
- ret = validateGroupByOperator((GroupByOperator) op);
+ ret = validateGroupByOperator((GroupByOperator) op, false, isTez);
break;
case FILTER:
ret = validateFilterOperator((FilterOperator) op);
@@ -814,6 +831,17 @@ public class Vectorizer implements Physi
case EXTRACT:
ret = validateExtractOperator((ExtractOperator) op);
break;
+ case MAPJOIN:
+ // Does MAPJOIN actually get planned in Reduce?
+ if (op instanceof MapJoinOperator) {
+ ret = validateMapJoinOperator((MapJoinOperator) op);
+ } else if (op instanceof SMBMapJoinOperator) {
+ ret = validateSMBMapJoinOperator((SMBMapJoinOperator) op);
+ }
+ break;
+ case GROUPBY:
+ ret = validateGroupByOperator((GroupByOperator) op, true, true);
+ break;
case FILTER:
ret = validateFilterOperator((FilterOperator) op);
break;
@@ -836,6 +864,23 @@ public class Vectorizer implements Physi
return ret;
}
+ public Boolean nonVectorizableChildOfGroupBy(Operator<? extends OperatorDesc> op) {
+ Operator<? extends OperatorDesc> currentOp = op;
+ while (currentOp.getParentOperators().size() > 0) {
+ currentOp = currentOp.getParentOperators().get(0);
+ if (currentOp.getType().equals(OperatorType.GROUPBY)) {
+ GroupByDesc desc = (GroupByDesc)currentOp.getConf();
+ boolean isVectorOutput = desc.getVectorDesc().isVectorOutput();
+ if (isVectorOutput) {
+ // This GROUP BY does vectorize its output.
+ return false;
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
private boolean validateSMBMapJoinOperator(SMBMapJoinOperator op) {
SMBJoinDesc desc = op.getConf();
// Validation is the same as for map join, since the 'small' tables are not vectorized
@@ -886,16 +931,57 @@ public class Vectorizer implements Physi
return validateExprNodeDesc(desc, VectorExpressionDescriptor.Mode.FILTER);
}
- private boolean validateGroupByOperator(GroupByOperator op) {
- if (op.getConf().isGroupingSetsPresent()) {
- LOG.warn("Grouping sets not supported in vector mode");
+ private boolean validateGroupByOperator(GroupByOperator op, boolean isReduce, boolean isTez) {
+ GroupByDesc desc = op.getConf();
+ VectorGroupByDesc vectorDesc = desc.getVectorDesc();
+
+ if (desc.isGroupingSetsPresent()) {
+ LOG.info("Grouping sets not supported in vector mode");
return false;
}
- boolean ret = validateExprNodeDesc(op.getConf().getKeys());
+ boolean ret = validateExprNodeDesc(desc.getKeys());
if (!ret) {
return false;
}
- return validateAggregationDesc(op.getConf().getAggregators());
+ ret = validateAggregationDesc(desc.getAggregators(), isReduce);
+ if (!ret) {
+ return false;
+ }
+ boolean isVectorOutput = isTez && aggregatorsOutputIsPrimitive(desc.getAggregators(), isReduce);
+ vectorDesc.setVectorOutput(isVectorOutput);
+ if (isReduce) {
+ if (desc.isDistinct()) {
+ LOG.info("Distinct not supported in reduce vector mode");
+ return false;
+ }
+ // Sort-based GroupBy?
+ if (desc.getMode() != GroupByDesc.Mode.COMPLETE &&
+ desc.getMode() != GroupByDesc.Mode.PARTIAL1 &&
+ desc.getMode() != GroupByDesc.Mode.PARTIAL2 &&
+ desc.getMode() != GroupByDesc.Mode.MERGEPARTIAL) {
+ LOG.info("Reduce vector mode not supported when input for GROUP BY not sorted");
+ return false;
+ }
+ LOG.info("Reduce GROUP BY mode is " + desc.getMode().name());
+ if (desc.getGroupKeyNotReductionKey()) {
+ LOG.info("Reduce vector mode not supported when group key is not reduction key");
+ return false;
+ }
+ if (!isVectorOutput) {
+ LOG.info("Reduce vector mode only supported when aggregate outputs are primitive types");
+ return false;
+ }
+ if (desc.getKeys().size() > 0) {
+ LOG.info("Reduce-side GROUP BY will process key groups");
+ vectorDesc.setVectorGroupBatches(true);
+ } else {
+ LOG.info("Reduce-side GROUP BY will do global aggregation");
+ }
+ vectorDesc.setIsReduce(true);
+ } else {
+ LOG.info("Downstream operators of map-side GROUP BY will be vectorized: " + isVectorOutput);
+ }
+ return true;
}
private boolean validateExtractOperator(ExtractOperator op) {
@@ -930,9 +1016,9 @@ public class Vectorizer implements Physi
return true;
}
- private boolean validateAggregationDesc(List<AggregationDesc> descs) {
+ private boolean validateAggregationDesc(List<AggregationDesc> descs, boolean isReduce) {
for (AggregationDesc d : descs) {
- boolean ret = validateAggregationDesc(d);
+ boolean ret = validateAggregationDesc(d, isReduce);
if (!ret) {
return false;
}
@@ -952,9 +1038,7 @@ public class Vectorizer implements Physi
String typeName = desc.getTypeInfo().getTypeName();
boolean ret = validateDataType(typeName);
if (!ret) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Cannot vectorize " + desc.toString() + " of type " + typeName);
- }
+ LOG.info("Cannot vectorize " + desc.toString() + " of type " + typeName);
return false;
}
if (desc instanceof ExprNodeGenericFuncDesc) {
@@ -987,12 +1071,11 @@ public class Vectorizer implements Physi
VectorizationContext vc = new ValidatorVectorizationContext();
if (vc.getVectorExpression(desc, mode) == null) {
// TODO: this cannot happen - VectorizationContext throws in such cases.
+ LOG.info("getVectorExpression returned null");
return false;
}
} catch (Exception e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Failed to vectorize", e);
- }
+ LOG.info("Failed to vectorize", e);
return false;
}
return true;
@@ -1011,16 +1094,56 @@ public class Vectorizer implements Physi
}
}
- private boolean validateAggregationDesc(AggregationDesc aggDesc) {
+ private boolean validateAggregationDesc(AggregationDesc aggDesc, boolean isReduce) {
if (!supportedAggregationUdfs.contains(aggDesc.getGenericUDAFName().toLowerCase())) {
return false;
}
if (aggDesc.getParameters() != null) {
return validateExprNodeDesc(aggDesc.getParameters());
}
+ // See if we can vectorize the aggregation.
+ try {
+ VectorizationContext vc = new ValidatorVectorizationContext();
+ if (vc.getAggregatorExpression(aggDesc, isReduce) == null) {
+ // TODO: this cannot happen - VectorizationContext throws in such cases.
+ LOG.info("getAggregatorExpression returned null");
+ return false;
+ }
+ } catch (Exception e) {
+ LOG.info("Failed to vectorize", e);
+ return false;
+ }
+ return true;
+ }
+
+ private boolean aggregatorsOutputIsPrimitive(List<AggregationDesc> descs, boolean isReduce) {
+ for (AggregationDesc d : descs) {
+ boolean ret = aggregatorsOutputIsPrimitive(d, isReduce);
+ if (!ret) {
+ return false;
+ }
+ }
return true;
}
+ private boolean aggregatorsOutputIsPrimitive(AggregationDesc aggDesc, boolean isReduce) {
+ VectorizationContext vc = new ValidatorVectorizationContext();
+ VectorAggregateExpression vectorAggrExpr;
+ try {
+ vectorAggrExpr = vc.getAggregatorExpression(aggDesc, isReduce);
+ } catch (Exception e) {
+ // We should have already attempted to vectorize in validateAggregationDesc.
+ LOG.info("Vectorization of aggreation should have succeeded ", e);
+ return false;
+ }
+
+ ObjectInspector outputObjInspector = vectorAggrExpr.getOutputObjectInspector();
+ if (outputObjInspector.getCategory() == ObjectInspector.Category.PRIMITIVE) {
+ return true;
+ }
+ return false;
+ }
+
private boolean validateDataType(String type) {
return supportedDataTypesPattern.matcher(type.toLowerCase()).matches();
}