You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/01 06:48:48 UTC
svn commit: r1527883 [3/6] - in /hive/branches/tez: ./
ant/src/org/apache/hadoop/hive/ant/
beeline/src/java/org/apache/hive/beeline/ bin/
common/src/java/org/apache/hadoop/hive/conf/ conf/
contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base...
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Tue Oct 1 04:48:44 2013
@@ -21,6 +21,9 @@ package org.apache.hadoop.hive.metastore
import static org.apache.commons.lang.StringUtils.join;
import static org.apache.commons.lang.StringUtils.repeat;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
@@ -30,10 +33,11 @@ import java.util.TreeMap;
import javax.jdo.PersistenceManager;
import javax.jdo.Query;
+import javax.jdo.Transaction;
+import javax.jdo.datastore.JDOConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.hadoop.hive.metastore.api.Order;
@@ -63,9 +67,87 @@ class MetaStoreDirectSql {
private static final Log LOG = LogFactory.getLog(MetaStoreDirectSql.class);
private final PersistenceManager pm;
+ /**
+ * We want to avoid db-specific code in this class and stick with ANSI SQL. However, mysql
+ * and postgres are differently ansi-incompatible (mysql by default doesn't support quoted
+ * identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case).
+ * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will
+ * use that. MySQL detection is done by actually issuing the set-ansi-quotes command.
+ */
+ private final boolean isMySql;
+
+ /**
+ * Whether direct SQL can be used with the current datastore backing {@link #pm}.
+ */
+ private final boolean isCompatibleDatastore;
+
+ // TODO: we might also want to work around the strange and arguably non-standard behavior
+ // of postgres where it rolls back a tx after a failed select (see SQL92 4.28, on page 69
+ // about implicit rollbacks; 4.10.1 last paragraph for the "spirit" of the standard).
+ // See #canUseDirectSql in ObjectStore, isActiveTransaction is undesirable but unavoidable
+ // for postgres; in MySQL and other databases we could avoid it.
public MetaStoreDirectSql(PersistenceManager pm) {
this.pm = pm;
+ Transaction tx = pm.currentTransaction();
+ tx.begin();
+ boolean isMySql = false;
+ try {
+ trySetAnsiQuotesForMysql();
+ isMySql = true;
+ } catch (SQLException sqlEx) {
+ LOG.info("MySQL check failed, assuming we are not on mysql: " + sqlEx.getMessage());
+ tx.rollback();
+ tx = pm.currentTransaction();
+ tx.begin();
+ }
+ // This should work. If it doesn't, we will self-disable. What a PITA...
+ boolean isCompatibleDatastore = false;
+ String selfTestQuery = "select \"DB_ID\" from \"DBS\"";
+ try {
+ pm.newQuery("javax.jdo.query.SQL", selfTestQuery).execute();
+ isCompatibleDatastore = true;
+ tx.commit();
+ } catch (Exception ex) {
+ LOG.error("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", ex);
+ tx.rollback();
+ }
+
+ this.isCompatibleDatastore = isCompatibleDatastore;
+ this.isMySql = isMySql;
+ }
+
+ public boolean isCompatibleDatastore() {
+ return isCompatibleDatastore;
+ }
+
+ /**
+ * See {@link #trySetAnsiQuotesForMysql()}.
+ */
+ private void setAnsiQuotesForMysql() throws MetaException {
+ try {
+ trySetAnsiQuotesForMysql();
+ } catch (SQLException sqlEx) {
+ throw new MetaException("Error setting ansi quotes: " + sqlEx.getMessage());
+ }
+ }
+
+ /**
+ * MySQL, by default, doesn't recognize ANSI quotes which need to have for Postgres.
+ * Try to set the ANSI quotes mode on for the session. Due to connection pooling, needs
+ * to be called in the same transaction as the actual queries.
+ */
+ private void trySetAnsiQuotesForMysql() throws SQLException {
+ final String queryText = "SET @@session.sql_mode=ANSI_QUOTES";
+ JDOConnection jdoConn = pm.getDataStoreConnection();
+ boolean doTrace = LOG.isDebugEnabled();
+ try {
+ long start = doTrace ? System.nanoTime() : 0;
+ ((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText);
+ timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
+ } finally {
+ jdoConn.close(); // We must release the connection before we call other pm methods.
+ }
}
/**
@@ -83,7 +165,8 @@ class MetaStoreDirectSql {
}
String list = repeat(",?", partNames.size()).substring(1);
return getPartitionsViaSqlFilterInternal(dbName, tblName, null,
- "and PARTITIONS.PART_NAME in (" + list + ")", partNames, new ArrayList<String>(), max);
+ "and \"PARTITIONS\".\"PART_NAME\" in (" + list + ")",
+ partNames, new ArrayList<String>(), max);
}
/**
@@ -124,9 +207,9 @@ class MetaStoreDirectSql {
}
private boolean isViewTable(String dbName, String tblName) throws MetaException {
- String queryText = "select TBL_TYPE from TBLS" +
- " inner join DBS on TBLS.DB_ID = DBS.DB_ID " +
- " where TBLS.TBL_NAME = ? and DBS.NAME = ?";
+ String queryText = "select \"TBL_TYPE\" from \"TBLS\"" +
+ " inner join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\" " +
+ " where \"TBLS\".\"TBL_NAME\" = ? and \"DBS\".\"NAME\" = ?";
Object[] params = new Object[] { tblName, dbName };
Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
query.setUnique(true);
@@ -155,7 +238,11 @@ class MetaStoreDirectSql {
dbName = dbName.toLowerCase();
tblName = tblName.toLowerCase();
// We have to be mindful of order during filtering if we are not returning all partitions.
- String orderForFilter = (max != null) ? " order by PART_NAME asc" : "";
+ String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : "";
+ if (isMySql) {
+ assert pm.currentTransaction().isActive();
+ setAnsiQuotesForMysql(); // must be inside tx together with queries
+ }
// Get all simple fields for partitions and related objects, which we can map one-on-one.
// We will do this in 2 queries to use different existing indices for each one.
@@ -163,13 +250,13 @@ class MetaStoreDirectSql {
// TODO: We might want to tune the indexes instead. With current ones MySQL performs
// poorly, esp. with 'order by' w/o index on large tables, even if the number of actual
// results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by
- // just adding a PART_ID IN (...) filter that doesn't alter the results to it, probably
+ // just adding a \"PART_ID\" IN (...) filter that doesn't alter the results to it, probably
// causing it to not sort the entire table due to not knowing how selective the filter is.
String queryText =
- "select PARTITIONS.PART_ID from PARTITIONS"
- + " inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
- + " inner join DBS on TBLS.DB_ID = DBS.DB_ID "
- + join(joinsForFilter, ' ') + " where TBLS.TBL_NAME = ? and DBS.NAME = ? "
+ "select \"PARTITIONS\".\"PART_ID\" from \"PARTITIONS\""
+ + " inner join \"TBLS\" on \"PARTITIONS\".\"TBL_ID\" = \"TBLS\".\"TBL_ID\" "
+ + " inner join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\" "
+ + join(joinsForFilter, ' ') + " where \"TBLS\".\"TBL_NAME\" = ? and \"DBS\".\"NAME\" = ? "
+ (sqlFilter == null ? "" : sqlFilter) + orderForFilter;
Object[] params = new Object[paramsForFilter.size() + 2];
params[0] = tblName;
@@ -203,14 +290,15 @@ class MetaStoreDirectSql {
// Now get most of the other fields.
queryText =
- "select PARTITIONS.PART_ID, SDS.SD_ID, SDS.CD_ID, SERDES.SERDE_ID, "
- + " PARTITIONS.CREATE_TIME, PARTITIONS.LAST_ACCESS_TIME, SDS.INPUT_FORMAT, "
- + " SDS.IS_COMPRESSED, SDS.IS_STOREDASSUBDIRECTORIES, SDS.LOCATION, SDS.NUM_BUCKETS, "
- + " SDS.OUTPUT_FORMAT, SERDES.NAME, SERDES.SLIB "
- + "from PARTITIONS"
- + " left outer join SDS on PARTITIONS.SD_ID = SDS.SD_ID "
- + " left outer join SERDES on SDS.SERDE_ID = SERDES.SERDE_ID "
- + "where PART_ID in (" + partIds + ") order by PART_NAME asc";
+ "select \"PARTITIONS\".\"PART_ID\", \"SDS\".\"SD_ID\", \"SDS\".\"CD_ID\","
+ + " \"SERDES\".\"SERDE_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+ + " \"PARTITIONS\".\"LAST_ACCESS_TIME\", \"SDS\".\"INPUT_FORMAT\", \"SDS\".\"IS_COMPRESSED\","
+ + " \"SDS\".\"IS_STOREDASSUBDIRECTORIES\", \"SDS\".\"LOCATION\", \"SDS\".\"NUM_BUCKETS\","
+ + " \"SDS\".\"OUTPUT_FORMAT\", \"SERDES\".\"NAME\", \"SERDES\".\"SLIB\" "
+ + "from \"PARTITIONS\""
+ + " left outer join \"SDS\" on \"PARTITIONS\".\"SD_ID\" = \"SDS\".\"SD_ID\" "
+ + " left outer join \"SERDES\" on \"SDS\".\"SERDE_ID\" = \"SERDES\".\"SERDE_ID\" "
+ + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc";
start = doTrace ? System.nanoTime() : 0;
query = pm.newQuery("javax.jdo.query.SQL", queryText);
@SuppressWarnings("unchecked")
@@ -254,8 +342,8 @@ class MetaStoreDirectSql {
part.setValues(new ArrayList<String>());
part.setDbName(dbName);
part.setTableName(tblName);
- if (fields[4] != null) part.setCreateTime((Integer)fields[4]);
- if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
+ if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4]));
+ if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5]));
partitions.put(partitionId, part);
if (sdId == null) continue; // Probably a view.
@@ -279,7 +367,7 @@ class MetaStoreDirectSql {
tmpBoolean = extractSqlBoolean(fields[8]);
if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
sd.setLocation((String)fields[9]);
- if (fields[10] != null) sd.setNumBuckets((Integer)fields[10]);
+ if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10]));
sd.setOutputFormat((String)fields[11]);
sdSb.append(sdId).append(",");
part.setSd(sd);
@@ -309,15 +397,17 @@ class MetaStoreDirectSql {
timingTrace(doTrace, queryText, start, queryTime);
// Now get all the one-to-many things. Start with partitions.
- queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
- + partIds + ") and PARAM_KEY is not null order by PART_ID asc";
+ queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"PARTITION_PARAMS\""
+ + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"PART_ID\" asc";
loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
public void apply(Partition t, Object[] fields) {
t.putToParameters((String)fields[1], (String)fields[2]);
}});
- queryText = "select PART_ID, PART_KEY_VAL from PARTITION_KEY_VALS where PART_ID in ("
- + partIds + ") and INTEGER_IDX >= 0 order by PART_ID asc, INTEGER_IDX asc";
+ queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from \"PARTITION_KEY_VALS\""
+ + " where \"PART_ID\" in (" + partIds + ") and \"INTEGER_IDX\" >= 0"
+ + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
public void apply(Partition t, Object[] fields) {
t.addToValues((String)fields[1]);
@@ -332,33 +422,35 @@ class MetaStoreDirectSql {
colIds = trimCommaList(colsSb);
// Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
- queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
- + sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
+ queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"SD_PARAMS\""
+ + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"SD_ID\" asc";
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
public void apply(StorageDescriptor t, Object[] fields) {
t.putToParameters((String)fields[1], (String)fields[2]);
}});
- // Note that SORT_COLS has "ORDER" column, which is not SQL92-legal. We have two choices
- // here - drop SQL92, or get '*' and be broken on certain schema changes. We do the latter.
- queryText = "select SD_ID, COLUMN_NAME, SORT_COLS.* from SORT_COLS where SD_ID in ("
- + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ queryText = "select \"SD_ID\", \"COLUMN_NAME\", \"SORT_COLS\".\"ORDER\" from \"SORT_COLS\""
+ + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
public void apply(StorageDescriptor t, Object[] fields) {
- if (fields[4] == null) return;
- t.addToSortCols(new Order((String)fields[1], (Integer)fields[4]));
+ if (fields[2] == null) return;
+ t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
}});
- queryText = "select SD_ID, BUCKET_COL_NAME from BUCKETING_COLS where SD_ID in ("
- + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from \"BUCKETING_COLS\""
+ + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
public void apply(StorageDescriptor t, Object[] fields) {
t.addToBucketCols((String)fields[1]);
}});
// Skewed columns stuff.
- queryText = "select SD_ID, SKEWED_COL_NAME from SKEWED_COL_NAMES where SD_ID in ("
- + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+ queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from \"SKEWED_COL_NAMES\""
+ + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+ + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
boolean hasSkewedColumns =
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
public void apply(StorageDescriptor t, Object[] fields) {
@@ -370,16 +462,17 @@ class MetaStoreDirectSql {
if (hasSkewedColumns) {
// We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
queryText =
- "select SKEWED_VALUES.SD_ID_OID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID, "
- + " SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
- + "from SKEWED_VALUES "
- + " left outer join SKEWED_STRING_LIST_VALUES on "
- + " SKEWED_VALUES.STRING_LIST_ID_EID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
- + "where SKEWED_VALUES.SD_ID_OID in (" + sdIds + ") "
- + " and SKEWED_VALUES.STRING_LIST_ID_EID is not null "
- + " and SKEWED_VALUES.INTEGER_IDX >= 0 "
- + "order by SKEWED_VALUES.SD_ID_OID asc, SKEWED_VALUES.INTEGER_IDX asc, "
- + " SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+ "select \"SKEWED_VALUES\".\"SD_ID_OID\","
+ + " \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\","
+ + " \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_VALUE\" "
+ + "from \"SKEWED_VALUES\" "
+ + " left outer join \"SKEWED_STRING_LIST_VALUES\" on \"SKEWED_VALUES\"."
+ + "\"STRING_LIST_ID_EID\" = \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" "
+ + "where \"SKEWED_VALUES\".\"SD_ID_OID\" in (" + sdIds + ") "
+ + " and \"SKEWED_VALUES\".\"STRING_LIST_ID_EID\" is not null "
+ + " and \"SKEWED_VALUES\".\"INTEGER_IDX\" >= 0 "
+ + "order by \"SKEWED_VALUES\".\"SD_ID_OID\" asc, \"SKEWED_VALUES\".\"INTEGER_IDX\" asc,"
+ + " \"SKEWED_STRING_LIST_VALUES\".\"INTEGER_IDX\" asc";
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
private Long currentListId;
private List<String> currentList;
@@ -404,16 +497,18 @@ class MetaStoreDirectSql {
// We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
queryText =
- "select SKEWED_COL_VALUE_LOC_MAP.SD_ID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID,"
- + " SKEWED_COL_VALUE_LOC_MAP.LOCATION, SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
- + "from SKEWED_COL_VALUE_LOC_MAP"
- + " left outer join SKEWED_STRING_LIST_VALUES on SKEWED_COL_VALUE_LOC_MAP."
- + "STRING_LIST_ID_KID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
- + "where SKEWED_COL_VALUE_LOC_MAP.SD_ID in (" + sdIds + ")"
- + " and SKEWED_COL_VALUE_LOC_MAP.STRING_LIST_ID_KID is not null "
- + "order by SKEWED_COL_VALUE_LOC_MAP.SD_ID asc,"
- + " SKEWED_STRING_LIST_VALUES.STRING_LIST_ID asc,"
- + " SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+ "select \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\","
+ + " \"SKEWED_STRING_LIST_VALUES\".STRING_LIST_ID,"
+ + " \"SKEWED_COL_VALUE_LOC_MAP\".\"LOCATION\","
+ + " \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_VALUE\" "
+ + "from \"SKEWED_COL_VALUE_LOC_MAP\""
+ + " left outer join \"SKEWED_STRING_LIST_VALUES\" on \"SKEWED_COL_VALUE_LOC_MAP\"."
+ + "\"STRING_LIST_ID_KID\" = \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" "
+ + "where \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\" in (" + sdIds + ")"
+ + " and \"SKEWED_COL_VALUE_LOC_MAP\".\"STRING_LIST_ID_KID\" is not null "
+ + "order by \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\" asc,"
+ + " \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" asc,"
+ + " \"SKEWED_STRING_LIST_VALUES\".\"INTEGER_IDX\" asc";
loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
private Long currentListId;
@@ -447,8 +542,9 @@ class MetaStoreDirectSql {
// Get FieldSchema stuff if any.
if (!colss.isEmpty()) {
// We are skipping the CDS table here, as it seems to be totally useless.
- queryText = "select CD_ID, COMMENT, COLUMN_NAME, TYPE_NAME from COLUMNS_V2 where CD_ID in ("
- + colIds + ") and INTEGER_IDX >= 0 order by CD_ID asc, INTEGER_IDX asc";
+ queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
+ + " from \"COLUMNS_V2\" where \"CD_ID\" in (" + colIds + ") and \"INTEGER_IDX\" >= 0"
+ + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
public void apply(List<FieldSchema> t, Object[] fields) {
t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
@@ -456,8 +552,9 @@ class MetaStoreDirectSql {
}
// Finally, get all the stuff for serdes - just the params.
- queryText = "select SERDE_ID, PARAM_KEY, PARAM_VALUE from SERDE_PARAMS where SERDE_ID in ("
- + serdeIds + ") and PARAM_KEY is not null order by SERDE_ID asc";
+ queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"SERDE_PARAMS\""
+ + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
+ + " order by \"SERDE_ID\" asc";
loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
public void apply(SerDeInfo t, Object[] fields) {
t.putToParameters((String)fields[1], (String)fields[2]);
@@ -469,7 +566,7 @@ class MetaStoreDirectSql {
private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
if (!doTrace) return;
LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
- (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+ (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
}
private static Boolean extractSqlBoolean(Object value) throws MetaException {
@@ -486,6 +583,10 @@ class MetaStoreDirectSql {
throw new MetaException("Cannot extrace boolean from column value " + value);
}
+ private int extractSqlInt(Object field) {
+ return ((Number)field).intValue();
+ }
+
private static String trimCommaList(StringBuilder sb) {
if (sb.length() > 0) {
sb.setLength(sb.length() - 1);
@@ -632,12 +733,12 @@ class MetaStoreDirectSql {
}
}
if (joins.get(partColIndex) == null) {
- joins.set(partColIndex, "inner join PARTITION_KEY_VALS as FILTER" + partColIndex
- + " on FILTER" + partColIndex + ".PART_ID = PARTITIONS.PART_ID and FILTER"
- + partColIndex + ".INTEGER_IDX = " + partColIndex);
+ joins.set(partColIndex, "inner join \"PARTITION_KEY_VALS\" as \"FILTER" + partColIndex
+ + "\" on \"FILTER" + partColIndex + "\".\"PART_ID\" = \"PARTITIONS\".\"PART_ID\""
+ + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
}
- String tableValue = "FILTER" + partColIndex + ".PART_KEY_VAL";
+ String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\"";
// TODO: need casts here if #doesOperatorSupportIntegral is amended to include lt/gt/etc.
filterBuffer.append(node.isReverseOrder
? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Tue Oct 1 04:48:44 2013
@@ -2115,6 +2115,7 @@ public class ObjectStore implements RawS
// TODO: Drop table can be very slow on large tables, we might want to address this.
return allowSql
&& HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)
+ && directSql.isCompatibleDatastore()
&& !isActiveTransaction();
}
Modified: hive/branches/tez/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/build.xml?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/build.xml (original)
+++ hive/branches/tez/ql/build.xml Tue Oct 1 04:48:44 2013
@@ -190,7 +190,7 @@
<echo message="Project: ${ant.project.name}"/>
<javac
encoding="${build.encoding}"
- srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}"
+ srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}:${build.dir}/gen/vector"
includes="**/*.java"
destdir="${build.classes}"
debug="${javac.debug}"
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Oct 1 04:48:44 2013
@@ -362,6 +362,7 @@ public enum ErrorMsg {
UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
+ PARTITION_SPEC_TYPE_MISMATCH(10248, "Cannot add partition column {0} of type {1} as it cannot be converted to type {2}", true),
INVALID_HDFS_URI(10248, "{0} is not a hdfs uri", true),
INVALID_DIR(10249, "{0} is not a directory", true),
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Oct 1 04:48:44 2013
@@ -373,6 +373,12 @@ public class FetchOperator implements Se
job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
.toString()));
+ // Fetch operator is not vectorized and as such turn vectorization flag off so that
+ // non-vectorized record reader is created below.
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ }
+
PartitionDesc partDesc;
if (currTbl == null) {
partDesc = currPart;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Oct 1 04:48:44 2013
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.FileUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -84,11 +86,13 @@ public class FileSinkOperator extends Te
protected transient int dpStartCol; // start column # for DP columns
protected transient List<String> dpVals; // array of values corresponding to DP columns
protected transient List<Object> dpWritables;
- protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
+ protected transient FSRecordWriter[] rowOutWriters; // row specific RecordWriters
protected transient int maxPartitions;
protected transient ListBucketingCtx lbCtx;
protected transient boolean isSkewedStoredAsSubDirectories;
- private transient boolean statsCollectRawDataSize;
+ protected transient boolean statsCollectRawDataSize;
+ private transient boolean[] statsFromRecordWriter;
+ private transient boolean isCollectRWStats;
private static final transient String[] FATAL_ERR_MSG = {
@@ -96,22 +100,12 @@ public class FileSinkOperator extends Te
"Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
};
- /**
- * RecordWriter.
- *
- */
- public static interface RecordWriter {
- void write(Writable w) throws IOException;
-
- void close(boolean abort) throws IOException;
- }
-
public class FSPaths implements Cloneable {
Path tmpPath;
Path taskOutputTempPath;
Path[] outPaths;
Path[] finalPaths;
- RecordWriter[] outWriters;
+ FSRecordWriter[] outWriters;
Stat stat;
public FSPaths() {
@@ -122,7 +116,7 @@ public class FileSinkOperator extends Te
taskOutputTempPath = Utilities.toTaskTempPath(specPath);
outPaths = new Path[numFiles];
finalPaths = new Path[numFiles];
- outWriters = new RecordWriter[numFiles];
+ outWriters = new FSRecordWriter[numFiles];
stat = new Stat();
}
@@ -166,11 +160,11 @@ public class FileSinkOperator extends Te
}
}
- public void setOutWriters(RecordWriter[] out) {
+ public void setOutWriters(FSRecordWriter[] out) {
outWriters = out;
}
- public RecordWriter[] getOutWriters() {
+ public FSRecordWriter[] getOutWriters() {
return outWriters;
}
@@ -221,6 +215,10 @@ public class FileSinkOperator extends Te
}
}
}
+
+ public Stat getStat() {
+ return stat;
+ }
} // class FSPaths
private static final long serialVersionUID = 1L;
@@ -228,7 +226,7 @@ public class FileSinkOperator extends Te
protected transient Serializer serializer;
protected transient BytesWritable commonKey = new BytesWritable();
protected transient TableIdEnum tabIdEnum = null;
- private transient LongWritable row_count;
+ protected transient LongWritable row_count;
private transient boolean isNativeTable = true;
/**
@@ -237,17 +235,17 @@ public class FileSinkOperator extends Te
* each reducer can write 10 files - this way we effectively get 1000 files.
*/
private transient ExprNodeEvaluator[] partitionEval;
- private transient int totalFiles;
+ protected transient int totalFiles;
private transient int numFiles;
- private transient boolean multiFileSpray;
- private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+ protected transient boolean multiFileSpray;
+ protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
private transient ObjectInspector[] partitionObjectInspectors;
- private transient HivePartitioner<HiveKey, Object> prtner;
- private transient final HiveKey key = new HiveKey();
+ protected transient HivePartitioner<HiveKey, Object> prtner;
+ protected transient final HiveKey key = new HiveKey();
private transient Configuration hconf;
- private transient FSPaths fsp;
- private transient boolean bDynParts;
+ protected transient FSPaths fsp;
+ protected transient boolean bDynParts;
private transient SubStructObjectInspector subSetOI;
private transient int timeOut; // JT timeout in msec.
private transient long lastProgressReport = System.currentTimeMillis();
@@ -279,7 +277,7 @@ public class FileSinkOperator extends Te
Class<? extends Writable> outputClass;
String taskId;
- private boolean filesCreated = false;
+ protected boolean filesCreated = false;
private void initializeSpecPath() {
// For a query of the type:
@@ -324,6 +322,7 @@ public class FileSinkOperator extends Te
isCompressed = conf.getCompressed();
parent = Utilities.toTempPath(conf.getDirName());
statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
+ statsFromRecordWriter = new boolean[numFiles];
serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
serializer.initialize(null, conf.getTableInfo().getProperties());
@@ -432,7 +431,7 @@ public class FileSinkOperator extends Te
}
}
- private void createBucketFiles(FSPaths fsp) throws HiveException {
+ protected void createBucketFiles(FSPaths fsp) throws HiveException {
try {
int filesIdx = 0;
Set<Integer> seenBuckets = new HashSet<Integer>();
@@ -516,6 +515,8 @@ public class FileSinkOperator extends Te
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
reporter);
+ // If the record writer provides stats, get it from there instead of the serde
+ statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
// increment the CREATED_FILES counter
if (reporter != null) {
reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -544,7 +545,7 @@ public class FileSinkOperator extends Te
*
* @return true if a new progress update is reported, false otherwise.
*/
- private boolean updateProgress() {
+ protected boolean updateProgress() {
if (reporter != null &&
(System.currentTimeMillis() - lastProgressReport) > timeOut) {
reporter.progress();
@@ -555,7 +556,7 @@ public class FileSinkOperator extends Te
}
}
- Writable recordValue;
+ protected Writable recordValue;
@Override
public void processOp(Object row, int tag) throws HiveException {
@@ -619,7 +620,11 @@ public class FileSinkOperator extends Te
}
rowOutWriters = fpaths.outWriters;
- if (conf.isGatherStats()) {
+ // check if all record writers implement statistics. if atleast one RW
+ // doesn't implement stats interface we will fallback to conventional way
+ // of gathering stats
+ isCollectRWStats = areAllTrue(statsFromRecordWriter);
+ if (conf.isGatherStats() && !isCollectRWStats) {
if (statsCollectRawDataSize) {
SerDeStats stats = serializer.getSerDeStats();
if (stats != null) {
@@ -630,12 +635,14 @@ public class FileSinkOperator extends Te
}
+ FSRecordWriter rowOutWriter = null;
+
if (row_count != null) {
row_count.set(row_count.get() + 1);
}
if (!multiFileSpray) {
- rowOutWriters[0].write(recordValue);
+ rowOutWriter = rowOutWriters[0];
} else {
int keyHashCode = 0;
for (int i = 0; i < partitionEval.length; i++) {
@@ -646,8 +653,9 @@ public class FileSinkOperator extends Te
key.setHashCode(keyHashCode);
int bucketNum = prtner.getBucket(key, null, totalFiles);
int idx = bucketMap.get(bucketNum);
- rowOutWriters[idx].write(recordValue);
+ rowOutWriter = rowOutWriters[idx];
}
+ rowOutWriter.write(recordValue);
} catch (IOException e) {
throw new HiveException(e);
} catch (SerDeException e) {
@@ -655,13 +663,22 @@ public class FileSinkOperator extends Te
}
}
+ private boolean areAllTrue(boolean[] statsFromRW) {
+ for(boolean b : statsFromRW) {
+ if (!b) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Lookup list bucketing path.
* @param lbDirName
* @return
* @throws HiveException
*/
- private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+ protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
FSPaths fsp2 = valToPaths.get(lbDirName);
if (fsp2 == null) {
fsp2 = createNewPaths(lbDirName);
@@ -699,7 +716,7 @@ public class FileSinkOperator extends Te
* @param row row to process.
* @return directory name.
*/
- private String generateListBucketingDirName(Object row) {
+ protected String generateListBucketingDirName(Object row) {
if (!this.isSkewedStoredAsSubDirectories) {
return null;
}
@@ -740,7 +757,7 @@ public class FileSinkOperator extends Te
return lbDirName;
}
- private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
+ protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
FSPaths fp;
@@ -864,6 +881,27 @@ public class FileSinkOperator extends Te
if (!abort) {
for (FSPaths fsp : valToPaths.values()) {
fsp.closeWriters(abort);
+
+ // before closing the operator check if statistics gathering is requested
+ // and is provided by record writer. this is different from the statistics
+ // gathering done in processOp(). In processOp(), for each row added
+ // serde statistics about the row is gathered and accumulated in hashmap.
+ // this adds more overhead to the actual processing of row. But if the
+ // record writer already gathers the statistics, it can simply return the
+ // accumulated statistics which will be aggregated in case of spray writers
+ if (conf.isGatherStats() && isCollectRWStats) {
+ for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+ FSRecordWriter outWriter = fsp.outWriters[idx];
+ if (outWriter != null) {
+ SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+ if (stats != null) {
+ fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+ fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+ }
+ }
+ }
+ }
+
if (isNativeTable) {
fsp.commit(fs);
}
@@ -934,7 +972,7 @@ public class FileSinkOperator extends Te
hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
}
else {
- hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
+ hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
}
}
else {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Oct 1 04:48:44 2013
@@ -46,13 +46,14 @@ public class FilterOperator extends Oper
FILTERED, PASSED
}
- private final transient LongWritable filtered_count, passed_count;
+ protected final transient LongWritable filtered_count;
+ protected final transient LongWritable passed_count;
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
private transient int consecutiveFails;
private transient int consecutiveSearches;
private transient IOContext ioContext;
- transient int heartbeatInterval;
+ protected transient int heartbeatInterval;
public FilterOperator() {
super();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Tue Oct 1 04:48:44 2013
@@ -141,8 +141,16 @@ public class GroupByOperator extends Ope
transient StructObjectInspector newKeyObjectInspector;
transient StructObjectInspector currentKeyObjectInspector;
public static MemoryMXBean memoryMXBean;
- private long maxMemory;
- private float memoryThreshold;
+
+ /**
+ * Total amount of memory allowed for JVM heap.
+ */
+ protected long maxMemory;
+
+ /**
+ * configure percent of memory threshold usable by QP.
+ */
+ protected float memoryThreshold;
private boolean groupingSetsPresent;
private int groupingSetsPosition;
@@ -159,10 +167,18 @@ public class GroupByOperator extends Ope
transient List<Field>[] aggrPositions;
transient int fixedRowSize;
- transient long maxHashTblMemory;
+
+ /**
+ * Max memory usable by the hashtable before it should flush.
+ */
+ protected transient long maxHashTblMemory;
transient int totalVariableSize;
transient int numEntriesVarSize;
- transient int numEntriesHashTable;
+
+ /**
+ * Current number of entries in the hash table.
+ */
+ protected transient int numEntriesHashTable;
transient int countAfterReport; // report or forward
transient int heartbeatInterval;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java Tue Oct 1 04:48:44 2013
@@ -22,9 +22,9 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
public abstract class KeyWrapper {
- abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
- abstract void setHashKey();
- abstract KeyWrapper copyKey();
- abstract void copyKey(KeyWrapper oldWrapper);
- abstract Object[] getKeyArray();
+ public abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
+ public abstract void setHashKey();
+ public abstract KeyWrapper copyKey();
+ public abstract void copyKey(KeyWrapper oldWrapper);
+ public abstract Object[] getKeyArray();
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 1 04:48:44 2013
@@ -103,7 +103,7 @@ public abstract class Operator<T extends
protected transient State state = State.UNINIT;
- static transient boolean fatalError = false; // fatalError is shared acorss
+ protected static transient boolean fatalError = false; // fatalError is shared acorss
// all operators
static {
@@ -1448,6 +1448,60 @@ public abstract class Operator<T extends
return ret;
}
+ /**
+ * Clones only the operator. The children and parent are set
+ * to null.
+ * @return Cloned operator
+ * @throws CloneNotSupportedException
+ */
+ public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
+ T descClone = (T) conf.clone();
+ Operator<? extends OperatorDesc> ret =
+ (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+ descClone, getSchema());
+ return ret;
+ }
+
+ /**
+ * Recursively clones all the children of the tree,
+ * Fixes the pointers to children, parents and the pointers to itself coming from the children.
+ * It does not fix the pointers to itself coming from parents, parents continue to point to
+ * the original child.
+ * @return Cloned operator
+ * @throws CloneNotSupportedException
+ */
+ public Operator<? extends OperatorDesc> cloneRecursiveChildren()
+ throws CloneNotSupportedException {
+ Operator<? extends OperatorDesc> newOp = this.cloneOp();
+ newOp.setParentOperators(this.parentOperators);
+ // Fix parent in all children
+ if (this.getChildOperators() == null) {
+ newOp.setChildOperators(null);
+ return newOp;
+ }
+ List<Operator<? extends OperatorDesc>> newChildren =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+
+ for (Operator<? extends OperatorDesc> childOp : this.getChildOperators()) {
+ List<Operator<? extends OperatorDesc>> parentList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> parent : childOp.getParentOperators()) {
+ if (parent.equals(this)) {
+ parentList.add(newOp);
+ } else {
+ parentList.add(parent);
+ }
+ }
+ // Recursively clone the children
+ Operator<? extends OperatorDesc> clonedChildOp = childOp.cloneRecursiveChildren();
+ clonedChildOp.setParentOperators(parentList);
+ }
+
+ newOp.setChildOperators(newChildren);
+ return newOp;
+ }
+
+
/*
* True only for operators which produce atmost 1 output row per input
* row to it. This will allow the output column names to be directly
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 1 04:48:44 2013
@@ -22,12 +22,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
-import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.La
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -106,6 +112,38 @@ public final class OperatorFactory {
MuxOperator.class));
}
+ public static ArrayList<OpTuple> vectorOpvec;
+ static {
+ vectorOpvec = new ArrayList<OpTuple>();
+ vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
+ vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
+ vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
+ VectorReduceSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
+ }
+
+ public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
+ VectorizationContext vContext) {
+ Class<T> descClass = (Class<T>) conf.getClass();
+ for (OpTuple o : vectorOpvec) {
+ if (o.descClass == descClass) {
+ try {
+ Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
+ VectorizationContext.class, OperatorDesc.class).newInstance(
+ vContext, conf);
+ op.initializeCounters();
+ return op;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ throw new RuntimeException("No vector operator for descriptor class "
+ + descClass.getName());
+ }
+
public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
for (OpTuple o : opvec) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 1 04:48:44 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
/**
@@ -72,11 +73,11 @@ public class ReduceSinkOperator extends
// TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
// ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
+ protected transient Serializer keySerializer;
+ protected transient boolean keyIsText;
+ protected transient Serializer valueSerializer;
transient int tag;
- transient byte[] tagByte = new byte[1];
+ protected transient byte[] tagByte = new byte[1];
transient protected int numDistributionKeys;
transient protected int numDistinctExprs;
transient String inputAlias; // input alias of this RS for join (used for PPD)
@@ -163,12 +164,15 @@ public class ReduceSinkOperator extends
}
transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
+ protected transient HiveKey keyWritable = new HiveKey();
+ protected transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
+ protected transient Object[] cachedValues;
+ protected transient List<List<Integer>> distinctColIndices;
/**
* This two dimensional array holds key data and a corresponding Union object
* which contains the tag identifying the aggregate expression for distinct columns.
@@ -183,13 +187,9 @@ public class ReduceSinkOperator extends
* in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
* see {@link ExprNodeColumnEvaluator}
*/
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List<List<Integer>> distinctColIndices;
-
+ protected transient Object[][] cachedKeys;
boolean firstRow;
-
- transient Random random;
+ protected transient Random random;
/**
* Initializes array of ExprNodeEvaluator. Adds Union field for distinct
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Tue Oct 1 04:48:44 2013
@@ -80,15 +80,20 @@ public class UnionOperator extends Opera
for (int p = 0; p < parents; p++) {
assert (parentFields[p].size() == columns);
for (int c = 0; c < columns; c++) {
- columnTypeResolvers[c].update(parentFields[p].get(c)
- .getFieldObjectInspector());
+ if (!columnTypeResolvers[c].update(parentFields[p].get(c)
+ .getFieldObjectInspector())) {
+ // checked in SemanticAnalyzer. Should not happen
+ throw new HiveException("Incompatible types for union operator");
+ }
}
}
ArrayList<ObjectInspector> outputFieldOIs = new ArrayList<ObjectInspector>(
columns);
for (int c = 0; c < columns; c++) {
- outputFieldOIs.add(columnTypeResolvers[c].get());
+ // can be null for void type
+ ObjectInspector oi = columnTypeResolvers[c].get();
+ outputFieldOIs.add(oi == null ? parentFields[0].get(c).getFieldObjectInspector() : oi);
}
// create output row ObjectInspector
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 1 04:48:44 2013
@@ -102,13 +102,13 @@ import org.apache.hadoop.hive.ql.Context
import org.apache.hadoop.hive.ql.Driver;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -572,7 +572,7 @@ public final class Utilities {
}
}
- private static Path getPlanPath(Configuration conf) {
+ public static Path getPlanPath(Configuration conf) {
String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
if (plan != null && !plan.isEmpty()) {
return new Path(plan);
@@ -1697,7 +1697,7 @@ public final class Utilities {
for (String p : paths) {
Path path = new Path(p);
- RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+ FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter(
jc, hiveOutputFormat, outputClass, isCompressed,
tableInfo.getProperties(), path, reporter);
writer.close(false);
@@ -2883,7 +2883,7 @@ public final class Utilities {
Path newFilePath = new Path(newFile);
String onefile = newPath.toString();
- RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+ FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
Text.class, false, props, null);
if (dummyRow) {
// empty files are omitted at CombineHiveInputFormat.
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Oct 1 04:48:44 2013
@@ -237,6 +237,7 @@ public class ExecDriver extends Task<Map
ShimLoader.getHadoopShims().prepareJobOutput(job);
//See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
job.setOutputFormat(HiveOutputFormatImpl.class);
+
job.setMapperClass(ExecMapper.class);
job.setMapOutputKeyClass(HiveKey.class);
@@ -828,3 +829,4 @@ public class ExecDriver extends Task<Map
}
}
}
+
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Tue Oct 1 04:48:44 2013
@@ -28,8 +28,8 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
@@ -240,7 +240,7 @@ public class PTFRowContainer<Row extends
}
- private static class PTFRecordWriter implements RecordWriter {
+ private static class PTFRecordWriter implements FSRecordWriter {
BytesWritable EMPTY_KEY = new BytesWritable();
SequenceFile.Writer outStream;
@@ -262,7 +262,7 @@ public class PTFRowContainer<Row extends
extends HiveSequenceFileOutputFormat<K,V> {
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Oct 1 04:48:44 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,7 +105,7 @@ public class RowContainer<ROW extends Li
int acutalSplitNum = 0;
int currentSplitPointer = 0;
org.apache.hadoop.mapred.RecordReader rr = null; // record reader
- RecordWriter rw = null;
+ FSRecordWriter rw = null;
InputFormat<WritableComparable, Writable> inputFormat = null;
InputSplit[] inputSplits = null;
private ROW dummyRow = null;
@@ -531,7 +531,7 @@ public class RowContainer<ROW extends Li
}
- protected RecordWriter getRecordWriter() {
+ protected FSRecordWriter getRecordWriter() {
return rw;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -24,7 +24,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
@@ -43,7 +42,7 @@ public class HiveBinaryOutputFormat<K ex
/**
* create the final out file, and output row by row. After one row is
* appended, a configured row separator is appended
- *
+ *
* @param jc
* the job configuration file
* @param outPath
@@ -59,14 +58,14 @@ public class HiveBinaryOutputFormat<K ex
* @return the RecordWriter
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = fs.create(outPath);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (r instanceof Text) {
Text tr = (Text) r;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Tue Oct 1 04:48:44 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -246,7 +245,7 @@ public final class HiveFileFormatUtils {
return true;
}
- public static RecordWriter getHiveRecordWriter(JobConf jc,
+ public static FSRecordWriter getHiveRecordWriter(JobConf jc,
TableDesc tableInfo, Class<? extends Writable> outputClass,
FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
boolean storagehandlerofhivepassthru = false;
@@ -287,7 +286,7 @@ public final class HiveFileFormatUtils {
}
}
- public static RecordWriter getRecordWriter(JobConf jc,
+ public static FSRecordWriter getRecordWriter(JobConf jc,
HiveOutputFormat<?, ?> hiveOutputFormat,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProp, Path outPath, Reporter reporter
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -25,7 +25,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.Progressab
/**
* HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the <key,
* value> to TextOutputFormat.RecordWriter.
- *
+ *
*/
public class HiveIgnoreKeyTextOutputFormat<K extends WritableComparable, V extends Writable>
extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
@@ -47,7 +46,7 @@ public class HiveIgnoreKeyTextOutputForm
/**
* create the final out file, and output row by row. After one row is
* appended, a configured row separator is appended
- *
+ *
* @param jc
* the job configuration file
* @param outPath
@@ -63,7 +62,7 @@ public class HiveIgnoreKeyTextOutputForm
* @return the RecordWriter
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
int rowSeparator = 0;
@@ -79,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
FileSystem fs = outPath.getFileSystem(jc);
final OutputStream outStream = Utilities.createCompressedStream(jc, fs
.create(outPath), isCompressed);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (r instanceof Text) {
Text tr = (Text) r;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.NullWritable;
@@ -48,7 +47,7 @@ public class HiveNullValueSequenceFileOu
private boolean keyIsText;
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
@@ -58,7 +57,7 @@ public class HiveNullValueSequenceFileOu
keyWritable = new HiveKey();
keyIsText = valueClass.equals(Text.class);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
if (keyIsText) {
Text text = (Text) r;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.Properties;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +57,7 @@ public interface HiveOutputFormat<K, V>
* progress used for status report
* @return the RecordWriter for the output file
*/
- RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
final Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -49,7 +49,7 @@ public class HivePassThroughOutputFormat
"org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
- "hive.passthrough.storagehandler.of";
+ "hive.passthrough.storagehandler.of";
public HivePassThroughOutputFormat() {
//construct this class through ReflectionUtils from FileSinkOperator
@@ -99,7 +99,7 @@ public class HivePassThroughOutputFormat
}
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
if (this.initialized == false) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java Tue Oct 1 04:48:44 2013
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io;
import java.io.IOException;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
-implements RecordWriter {
+implements FSRecordWriter {
private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +55,7 @@ public class HiveSequenceFileOutputForma
* @return the RecordWriter for the output file
*/
@Override
- public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+ public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
Class<? extends Writable> valueClass, boolean isCompressed,
Properties tableProperties, Progressable progress) throws IOException {
@@ -64,7 +63,7 @@ public class HiveSequenceFileOutputForma
final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
- return new RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
outStream.append(EMPTY_KEY, r);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -118,7 +118,7 @@ public class RCFileOutputFormat extends
* @throws IOException
*/
@Override
- public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+ public FSRecordWriter getHiveRecordWriter(
JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {
@@ -135,7 +135,7 @@ public class RCFileOutputFormat extends
(jc, finalOutPath.getFileSystem(jc),
finalOutPath, isCompressed);
- return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+ return new FSRecordWriter() {
public void write(Writable r) throws IOException {
outWriter.append(r);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Tue Oct 1 04:48:44 2013
@@ -17,6 +17,14 @@
*/
package org.apache.hadoop.hive.ql.io.avro;
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+import java.io.IOException;
+import java.util.Properties;
+
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
@@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericDa
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
@@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.RecordWr
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Progressable;
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
-
/**
* Write to an Avro file from a Hive process.
*/
@@ -51,7 +51,7 @@ public class AvroContainerOutputFormat
implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
@Override
- public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+ public FSRecordWriter getHiveRecordWriter(JobConf jobConf,
Path path, Class<? extends Writable> valueClass, boolean isCompressed,
Properties properties, Progressable progressable) throws IOException {
Schema schema;
@@ -62,7 +62,7 @@ public class AvroContainerOutputFormat
}
GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(schema);
DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
-
+
if (isCompressed) {
int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java Tue Oct 1 04:48:44 2013
@@ -18,18 +18,18 @@
package org.apache.hadoop.hive.ql.io.avro;
+import java.io.IOException;
+
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
import org.apache.hadoop.io.Writable;
-import java.io.IOException;
-
/**
* Write an Avro GenericRecord to an Avro data file.
*/
-public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+public class AvroGenericRecordWriter implements FSRecordWriter{
final private DataFileWriter<GenericRecord> dfw;
public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Oct 1 04:48:44 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
class BitFieldReader {
- private RunLengthByteReader input;
+ private final RunLengthByteReader input;
private final int bitSize;
private int current;
private int bitsLeft;
@@ -60,6 +62,30 @@ class BitFieldReader {
return result & mask;
}
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int types in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Oct 1 04:48:44 2013
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Text;
+
/**
* A class that is a growable array of bytes. Growth is managed in terms of
* chunks that are allocated when needed.
@@ -237,6 +237,7 @@ final class DynamicByteArray {
}
}
+ @Override
public String toString() {
int i;
StringBuilder sb = new StringBuilder(length * 3);
@@ -268,10 +269,35 @@ final class DynamicByteArray {
}
/**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLength = length;
+ while (totalLength > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLength -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLength, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+
+ /**
* Get the size of the buffers.
*/
public long getSizeInBytes() {
return initializedChunks * chunkSize;
}
}
-
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Oct 1 04:48:44 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* Interface for reading integers.
@@ -52,4 +53,12 @@ interface IntegerReader {
* @throws IOException
*/
long next() throws IOException;
+
+ /**
+ * Return the next available vector for values.
+ * @return
+ * @throws IOException
+ */
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Oct 1 04:48:44 2013
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -54,12 +55,13 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-
/**
* A MapReduce/Hive input format for ORC files.
*/
-public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
- InputFormatChecker {
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
+ InputFormatChecker, VectorizedInputFormatInterface {
+
+ VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
@@ -85,6 +87,7 @@ public class OrcInputFormat implements I
private final int numColumns;
private float progress = 0.0f;
+
OrcRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
@@ -219,6 +222,12 @@ public class OrcInputFormat implements I
public RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
+ if (isVectorMode(conf)) {
+ RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
+ reporter);
+ return (RecordReader) vorr;
+ }
+
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
@@ -231,6 +240,11 @@ public class OrcInputFormat implements I
public boolean validateInput(FileSystem fs, HiveConf conf,
ArrayList<FileStatus> files
) throws IOException {
+
+ if (isVectorMode(conf)) {
+ return voif.validateInput(fs, conf, files);
+ }
+
if (files.size() <= 0) {
return false;
}
@@ -244,6 +258,14 @@ public class OrcInputFormat implements I
return true;
}
+ private boolean isVectorMode(Configuration conf) {
+ if (Utilities.getPlanPath(conf) != null && Utilities
+ .getMapRedWork(conf).getMapWork().getVectorMode()) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Get the list of input {@link Path}s for the map-reduce job.
*