You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/01 06:48:48 UTC

svn commit: r1527883 [3/6] - in /hive/branches/tez: ./ ant/src/org/apache/hadoop/hive/ant/ beeline/src/java/org/apache/hive/beeline/ bin/ common/src/java/org/apache/hadoop/hive/conf/ conf/ contrib/src/java/org/apache/hadoop/hive/contrib/fileformat/base...

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Tue Oct  1 04:48:44 2013
@@ -21,6 +21,9 @@ package org.apache.hadoop.hive.metastore
 import static org.apache.commons.lang.StringUtils.join;
 import static org.apache.commons.lang.StringUtils.repeat;
 
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -30,10 +33,11 @@ import java.util.TreeMap;
 
 import javax.jdo.PersistenceManager;
 import javax.jdo.Query;
+import javax.jdo.Transaction;
+import javax.jdo.datastore.JDOConnection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hive.common.ObjectPair;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.Order;
@@ -63,9 +67,87 @@ class MetaStoreDirectSql {
   private static final Log LOG = LogFactory.getLog(MetaStoreDirectSql.class);
 
   private final PersistenceManager pm;
+  /**
+   * We want to avoid db-specific code in this class and stick with ANSI SQL. However, mysql
+   * and postgres are differently ansi-incompatible (mysql by default doesn't support quoted
+   * identifiers, and postgres contravenes ANSI by coercing unquoted ones to lower case).
+   * MySQL's way of working around this is simpler (just set ansi quotes mode on), so we will
+   * use that. MySQL detection is done by actually issuing the set-ansi-quotes command.
+   */
+  private final boolean isMySql;
+
+  /**
+   * Whether direct SQL can be used with the current datastore backing {@link #pm}.
+   */
+  private final boolean isCompatibleDatastore;
+
+  // TODO: we might also want to work around the strange and arguably non-standard behavior
+  // of postgres where it rolls back a tx after a failed select (see SQL92 4.28, on page 69
+  // about implicit rollbacks; 4.10.1 last paragraph for the "spirit" of the standard).
+  // See #canUseDirectSql in ObjectStore, isActiveTransaction is undesirable but unavoidable
+  // for postgres; in MySQL and other databases we could avoid it.
 
   public MetaStoreDirectSql(PersistenceManager pm) {
     this.pm = pm;
+    Transaction tx = pm.currentTransaction();
+    tx.begin();
+    boolean isMySql = false;
+    try {
+      trySetAnsiQuotesForMysql();
+      isMySql = true;
+    } catch (SQLException sqlEx) {
+      LOG.info("MySQL check failed, assuming we are not on mysql: " + sqlEx.getMessage());
+      tx.rollback();
+      tx = pm.currentTransaction();
+      tx.begin();
+    }
+    // This should work. If it doesn't, we will self-disable. What a PITA...
+    boolean isCompatibleDatastore = false;
+    String selfTestQuery = "select \"DB_ID\" from \"DBS\"";
+    try {
+      pm.newQuery("javax.jdo.query.SQL", selfTestQuery).execute();
+      isCompatibleDatastore = true;
+      tx.commit();
+    } catch (Exception ex) {
+      LOG.error("Self-test query [" + selfTestQuery + "] failed; direct SQL is disabled", ex);
+      tx.rollback();
+    }
+
+    this.isCompatibleDatastore = isCompatibleDatastore;
+    this.isMySql = isMySql;
+  }
+
+  public boolean isCompatibleDatastore() {
+    return isCompatibleDatastore;
+  }
+
+  /**
+   * See {@link #trySetAnsiQuotesForMysql()}.
+   */
+  private void setAnsiQuotesForMysql() throws MetaException {
+    try {
+      trySetAnsiQuotesForMysql();
+    } catch (SQLException sqlEx) {
+      throw new MetaException("Error setting ansi quotes: " + sqlEx.getMessage());
+    }
+  }
+
+  /**
+   * MySQL, by default, doesn't recognize ANSI quotes which need to have for Postgres.
+   * Try to set the ANSI quotes mode on for the session. Due to connection pooling, needs
+   * to be called in the same transaction as the actual queries.
+   */
+  private void trySetAnsiQuotesForMysql() throws SQLException {
+    final String queryText = "SET @@session.sql_mode=ANSI_QUOTES";
+    JDOConnection jdoConn = pm.getDataStoreConnection();
+    boolean doTrace = LOG.isDebugEnabled();
+    try {
+      long start = doTrace ? System.nanoTime() : 0;
+      ((Connection)jdoConn.getNativeConnection()).createStatement().execute(queryText);
+      timingTrace(doTrace, queryText, start, doTrace ? System.nanoTime() : 0);
+    } finally {
+      jdoConn.close(); // We must release the connection before we call other pm methods.
+    }
   }
 
   /**
@@ -83,7 +165,8 @@ class MetaStoreDirectSql {
     }
     String list = repeat(",?", partNames.size()).substring(1);
     return getPartitionsViaSqlFilterInternal(dbName, tblName, null,
-        "and PARTITIONS.PART_NAME in (" + list + ")", partNames, new ArrayList<String>(), max);
+        "and \"PARTITIONS\".\"PART_NAME\" in (" + list + ")",
+        partNames, new ArrayList<String>(), max);
   }
 
   /**
@@ -124,9 +207,9 @@ class MetaStoreDirectSql {
   }
 
   private boolean isViewTable(String dbName, String tblName) throws MetaException {
-    String queryText = "select TBL_TYPE from TBLS" +
-        " inner join DBS on TBLS.DB_ID = DBS.DB_ID " +
-        " where TBLS.TBL_NAME = ? and DBS.NAME = ?";
+    String queryText = "select \"TBL_TYPE\" from \"TBLS\"" +
+        " inner join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\" " +
+        " where \"TBLS\".\"TBL_NAME\" = ? and \"DBS\".\"NAME\" = ?";
     Object[] params = new Object[] { tblName, dbName };
     Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
     query.setUnique(true);
@@ -155,7 +238,11 @@ class MetaStoreDirectSql {
     dbName = dbName.toLowerCase();
     tblName = tblName.toLowerCase();
     // We have to be mindful of order during filtering if we are not returning all partitions.
-    String orderForFilter = (max != null) ? " order by PART_NAME asc" : "";
+    String orderForFilter = (max != null) ? " order by \"PART_NAME\" asc" : "";
+    if (isMySql) {
+      assert pm.currentTransaction().isActive();
+      setAnsiQuotesForMysql(); // must be inside tx together with queries
+    }
 
     // Get all simple fields for partitions and related objects, which we can map one-on-one.
     // We will do this in 2 queries to use different existing indices for each one.
@@ -163,13 +250,13 @@ class MetaStoreDirectSql {
     // TODO: We might want to tune the indexes instead. With current ones MySQL performs
     // poorly, esp. with 'order by' w/o index on large tables, even if the number of actual
     // results is small (query that returns 8 out of 32k partitions can go 4sec. to 0sec. by
-    // just adding a PART_ID IN (...) filter that doesn't alter the results to it, probably
+    // just adding a \"PART_ID\" IN (...) filter that doesn't alter the results to it, probably
     // causing it to not sort the entire table due to not knowing how selective the filter is.
     String queryText =
-        "select PARTITIONS.PART_ID from PARTITIONS"
-      + "  inner join TBLS on PARTITIONS.TBL_ID = TBLS.TBL_ID "
-      + "  inner join DBS on TBLS.DB_ID = DBS.DB_ID "
-      + join(joinsForFilter, ' ') + " where TBLS.TBL_NAME = ? and DBS.NAME = ? "
+        "select \"PARTITIONS\".\"PART_ID\" from \"PARTITIONS\""
+      + "  inner join \"TBLS\" on \"PARTITIONS\".\"TBL_ID\" = \"TBLS\".\"TBL_ID\" "
+      + "  inner join \"DBS\" on \"TBLS\".\"DB_ID\" = \"DBS\".\"DB_ID\" "
+      + join(joinsForFilter, ' ') + " where \"TBLS\".\"TBL_NAME\" = ? and \"DBS\".\"NAME\" = ? "
       + (sqlFilter == null ? "" : sqlFilter) + orderForFilter;
     Object[] params = new Object[paramsForFilter.size() + 2];
     params[0] = tblName;
@@ -203,14 +290,15 @@ class MetaStoreDirectSql {
 
     // Now get most of the other fields.
     queryText =
-      "select PARTITIONS.PART_ID, SDS.SD_ID, SDS.CD_ID, SERDES.SERDE_ID, "
-    + "  PARTITIONS.CREATE_TIME, PARTITIONS.LAST_ACCESS_TIME, SDS.INPUT_FORMAT, "
-    + "  SDS.IS_COMPRESSED, SDS.IS_STOREDASSUBDIRECTORIES, SDS.LOCATION,  SDS.NUM_BUCKETS, "
-    + "  SDS.OUTPUT_FORMAT, SERDES.NAME, SERDES.SLIB "
-    + "from PARTITIONS"
-    + "  left outer join SDS on PARTITIONS.SD_ID = SDS.SD_ID "
-    + "  left outer join SERDES on SDS.SERDE_ID = SERDES.SERDE_ID "
-    + "where PART_ID in (" + partIds + ") order by PART_NAME asc";
+      "select \"PARTITIONS\".\"PART_ID\", \"SDS\".\"SD_ID\", \"SDS\".\"CD_ID\","
+    + " \"SERDES\".\"SERDE_ID\", \"PARTITIONS\".\"CREATE_TIME\","
+    + " \"PARTITIONS\".\"LAST_ACCESS_TIME\", \"SDS\".\"INPUT_FORMAT\", \"SDS\".\"IS_COMPRESSED\","
+    + " \"SDS\".\"IS_STOREDASSUBDIRECTORIES\", \"SDS\".\"LOCATION\", \"SDS\".\"NUM_BUCKETS\","
+    + " \"SDS\".\"OUTPUT_FORMAT\", \"SERDES\".\"NAME\", \"SERDES\".\"SLIB\" "
+    + "from \"PARTITIONS\""
+    + "  left outer join \"SDS\" on \"PARTITIONS\".\"SD_ID\" = \"SDS\".\"SD_ID\" "
+    + "  left outer join \"SERDES\" on \"SDS\".\"SERDE_ID\" = \"SERDES\".\"SERDE_ID\" "
+    + "where \"PART_ID\" in (" + partIds + ") order by \"PART_NAME\" asc";
     start = doTrace ? System.nanoTime() : 0;
     query = pm.newQuery("javax.jdo.query.SQL", queryText);
     @SuppressWarnings("unchecked")
@@ -254,8 +342,8 @@ class MetaStoreDirectSql {
       part.setValues(new ArrayList<String>());
       part.setDbName(dbName);
       part.setTableName(tblName);
-      if (fields[4] != null) part.setCreateTime((Integer)fields[4]);
-      if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
+      if (fields[4] != null) part.setCreateTime(extractSqlInt(fields[4]));
+      if (fields[5] != null) part.setLastAccessTime(extractSqlInt(fields[5]));
       partitions.put(partitionId, part);
 
       if (sdId == null) continue; // Probably a view.
@@ -279,7 +367,7 @@ class MetaStoreDirectSql {
       tmpBoolean = extractSqlBoolean(fields[8]);
       if (tmpBoolean != null) sd.setStoredAsSubDirectories(tmpBoolean);
       sd.setLocation((String)fields[9]);
-      if (fields[10] != null) sd.setNumBuckets((Integer)fields[10]);
+      if (fields[10] != null) sd.setNumBuckets(extractSqlInt(fields[10]));
       sd.setOutputFormat((String)fields[11]);
       sdSb.append(sdId).append(",");
       part.setSd(sd);
@@ -309,15 +397,17 @@ class MetaStoreDirectSql {
     timingTrace(doTrace, queryText, start, queryTime);
 
     // Now get all the one-to-many things. Start with partitions.
-    queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
-        + partIds + ") and PARAM_KEY is not null order by PART_ID asc";
+    queryText = "select \"PART_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"PARTITION_PARAMS\""
+        + " where \"PART_ID\" in (" + partIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"PART_ID\" asc";
     loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
       public void apply(Partition t, Object[] fields) {
         t.putToParameters((String)fields[1], (String)fields[2]);
       }});
 
-    queryText = "select PART_ID, PART_KEY_VAL from PARTITION_KEY_VALS where PART_ID in ("
-        + partIds + ") and INTEGER_IDX >= 0 order by PART_ID asc, INTEGER_IDX asc";
+    queryText = "select \"PART_ID\", \"PART_KEY_VAL\" from \"PARTITION_KEY_VALS\""
+        + " where \"PART_ID\" in (" + partIds + ") and \"INTEGER_IDX\" >= 0"
+        + " order by \"PART_ID\" asc, \"INTEGER_IDX\" asc";
     loopJoinOrderedResult(partitions, queryText, 0, new ApplyFunc<Partition>() {
       public void apply(Partition t, Object[] fields) {
         t.addToValues((String)fields[1]);
@@ -332,33 +422,35 @@ class MetaStoreDirectSql {
         colIds = trimCommaList(colsSb);
 
     // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
-    queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
-        + sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
+    queryText = "select \"SD_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"SD_PARAMS\""
+        + " where \"SD_ID\" in (" + sdIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"SD_ID\" asc";
     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
       public void apply(StorageDescriptor t, Object[] fields) {
         t.putToParameters((String)fields[1], (String)fields[2]);
       }});
 
-    // Note that SORT_COLS has "ORDER" column, which is not SQL92-legal. We have two choices
-    // here - drop SQL92, or get '*' and be broken on certain schema changes. We do the latter.
-    queryText = "select SD_ID, COLUMN_NAME, SORT_COLS.* from SORT_COLS where SD_ID in ("
-        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    queryText = "select \"SD_ID\", \"COLUMN_NAME\", \"SORT_COLS\".\"ORDER\" from \"SORT_COLS\""
+        + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
       public void apply(StorageDescriptor t, Object[] fields) {
-        if (fields[4] == null) return;
-        t.addToSortCols(new Order((String)fields[1], (Integer)fields[4]));
+        if (fields[2] == null) return;
+        t.addToSortCols(new Order((String)fields[1], extractSqlInt(fields[2])));
       }});
 
-    queryText = "select SD_ID, BUCKET_COL_NAME from BUCKETING_COLS where SD_ID in ("
-        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    queryText = "select \"SD_ID\", \"BUCKET_COL_NAME\" from \"BUCKETING_COLS\""
+        + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
     loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
       public void apply(StorageDescriptor t, Object[] fields) {
         t.addToBucketCols((String)fields[1]);
       }});
 
     // Skewed columns stuff.
-    queryText = "select SD_ID, SKEWED_COL_NAME from SKEWED_COL_NAMES where SD_ID in ("
-        + sdIds + ") and INTEGER_IDX >= 0 order by SD_ID asc, INTEGER_IDX asc";
+    queryText = "select \"SD_ID\", \"SKEWED_COL_NAME\" from \"SKEWED_COL_NAMES\""
+        + " where \"SD_ID\" in (" + sdIds + ") and \"INTEGER_IDX\" >= 0"
+        + " order by \"SD_ID\" asc, \"INTEGER_IDX\" asc";
     boolean hasSkewedColumns =
       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
         public void apply(StorageDescriptor t, Object[] fields) {
@@ -370,16 +462,17 @@ class MetaStoreDirectSql {
     if (hasSkewedColumns) {
       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
       queryText =
-            "select SKEWED_VALUES.SD_ID_OID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID, "
-          + "  SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
-          + "from SKEWED_VALUES "
-          + "  left outer join SKEWED_STRING_LIST_VALUES on "
-          + "    SKEWED_VALUES.STRING_LIST_ID_EID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
-          + "where SKEWED_VALUES.SD_ID_OID in (" + sdIds + ") "
-          + "  and SKEWED_VALUES.STRING_LIST_ID_EID is not null "
-          + "  and SKEWED_VALUES.INTEGER_IDX >= 0 "
-          + "order by SKEWED_VALUES.SD_ID_OID asc, SKEWED_VALUES.INTEGER_IDX asc, "
-          + "  SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+            "select \"SKEWED_VALUES\".\"SD_ID_OID\","
+          + "  \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\","
+          + "  \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_VALUE\" "
+          + "from \"SKEWED_VALUES\" "
+          + "  left outer join \"SKEWED_STRING_LIST_VALUES\" on \"SKEWED_VALUES\"."
+          + "\"STRING_LIST_ID_EID\" = \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" "
+          + "where \"SKEWED_VALUES\".\"SD_ID_OID\" in (" + sdIds + ") "
+          + "  and \"SKEWED_VALUES\".\"STRING_LIST_ID_EID\" is not null "
+          + "  and \"SKEWED_VALUES\".\"INTEGER_IDX\" >= 0 "
+          + "order by \"SKEWED_VALUES\".\"SD_ID_OID\" asc, \"SKEWED_VALUES\".\"INTEGER_IDX\" asc,"
+          + "  \"SKEWED_STRING_LIST_VALUES\".\"INTEGER_IDX\" asc";
       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
         private Long currentListId;
         private List<String> currentList;
@@ -404,16 +497,18 @@ class MetaStoreDirectSql {
 
       // We are skipping the SKEWED_STRING_LIST table here, as it seems to be totally useless.
       queryText =
-            "select SKEWED_COL_VALUE_LOC_MAP.SD_ID, SKEWED_STRING_LIST_VALUES.STRING_LIST_ID,"
-          + "  SKEWED_COL_VALUE_LOC_MAP.LOCATION, SKEWED_STRING_LIST_VALUES.STRING_LIST_VALUE "
-          + "from SKEWED_COL_VALUE_LOC_MAP"
-          + "  left outer join SKEWED_STRING_LIST_VALUES on SKEWED_COL_VALUE_LOC_MAP."
-          + "STRING_LIST_ID_KID = SKEWED_STRING_LIST_VALUES.STRING_LIST_ID "
-          + "where SKEWED_COL_VALUE_LOC_MAP.SD_ID in (" + sdIds + ")"
-          + "  and SKEWED_COL_VALUE_LOC_MAP.STRING_LIST_ID_KID is not null "
-          + "order by SKEWED_COL_VALUE_LOC_MAP.SD_ID asc,"
-          + "  SKEWED_STRING_LIST_VALUES.STRING_LIST_ID asc,"
-          + "  SKEWED_STRING_LIST_VALUES.INTEGER_IDX asc";
+            "select \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\","
+          + " \"SKEWED_STRING_LIST_VALUES\".STRING_LIST_ID,"
+          + " \"SKEWED_COL_VALUE_LOC_MAP\".\"LOCATION\","
+          + " \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_VALUE\" "
+          + "from \"SKEWED_COL_VALUE_LOC_MAP\""
+          + "  left outer join \"SKEWED_STRING_LIST_VALUES\" on \"SKEWED_COL_VALUE_LOC_MAP\"."
+          + "\"STRING_LIST_ID_KID\" = \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" "
+          + "where \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\" in (" + sdIds + ")"
+          + "  and \"SKEWED_COL_VALUE_LOC_MAP\".\"STRING_LIST_ID_KID\" is not null "
+          + "order by \"SKEWED_COL_VALUE_LOC_MAP\".\"SD_ID\" asc,"
+          + "  \"SKEWED_STRING_LIST_VALUES\".\"STRING_LIST_ID\" asc,"
+          + "  \"SKEWED_STRING_LIST_VALUES\".\"INTEGER_IDX\" asc";
 
       loopJoinOrderedResult(sds, queryText, 0, new ApplyFunc<StorageDescriptor>() {
         private Long currentListId;
@@ -447,8 +542,9 @@ class MetaStoreDirectSql {
     // Get FieldSchema stuff if any.
     if (!colss.isEmpty()) {
       // We are skipping the CDS table here, as it seems to be totally useless.
-      queryText = "select CD_ID, COMMENT, COLUMN_NAME, TYPE_NAME from COLUMNS_V2 where CD_ID in ("
-          + colIds + ") and INTEGER_IDX >= 0 order by CD_ID asc, INTEGER_IDX asc";
+      queryText = "select \"CD_ID\", \"COMMENT\", \"COLUMN_NAME\", \"TYPE_NAME\""
+          + " from \"COLUMNS_V2\" where \"CD_ID\" in (" + colIds + ") and \"INTEGER_IDX\" >= 0"
+          + " order by \"CD_ID\" asc, \"INTEGER_IDX\" asc";
       loopJoinOrderedResult(colss, queryText, 0, new ApplyFunc<List<FieldSchema>>() {
         public void apply(List<FieldSchema> t, Object[] fields) {
           t.add(new FieldSchema((String)fields[2], (String)fields[3], (String)fields[1]));
@@ -456,8 +552,9 @@ class MetaStoreDirectSql {
     }
 
     // Finally, get all the stuff for serdes - just the params.
-    queryText = "select SERDE_ID, PARAM_KEY, PARAM_VALUE from SERDE_PARAMS where SERDE_ID in ("
-        + serdeIds + ") and PARAM_KEY is not null order by SERDE_ID asc";
+    queryText = "select \"SERDE_ID\", \"PARAM_KEY\", \"PARAM_VALUE\" from \"SERDE_PARAMS\""
+        + " where \"SERDE_ID\" in (" + serdeIds + ") and \"PARAM_KEY\" is not null"
+        + " order by \"SERDE_ID\" asc";
     loopJoinOrderedResult(serdes, queryText, 0, new ApplyFunc<SerDeInfo>() {
       public void apply(SerDeInfo t, Object[] fields) {
         t.putToParameters((String)fields[1], (String)fields[2]);
@@ -469,7 +566,7 @@ class MetaStoreDirectSql {
   private void timingTrace(boolean doTrace, String queryText, long start, long queryTime) {
     if (!doTrace) return;
     LOG.debug("Direct SQL query in " + (queryTime - start) / 1000000.0 + "ms + " +
-        (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
+        (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [" + queryText + "]");
   }
 
   private static Boolean extractSqlBoolean(Object value) throws MetaException {
@@ -486,6 +583,10 @@ class MetaStoreDirectSql {
     throw new MetaException("Cannot extrace boolean from column value " + value);
   }
 
+  private int extractSqlInt(Object field) {
+    return ((Number)field).intValue();
+  }
+
   private static String trimCommaList(StringBuilder sb) {
     if (sb.length() > 0) {
       sb.setLength(sb.length() - 1);
@@ -632,12 +733,12 @@ class MetaStoreDirectSql {
         }
       }
       if (joins.get(partColIndex) == null) {
-        joins.set(partColIndex, "inner join PARTITION_KEY_VALS as FILTER" + partColIndex
-            + " on FILTER"  + partColIndex + ".PART_ID = PARTITIONS.PART_ID and FILTER"
-            + partColIndex + ".INTEGER_IDX = " + partColIndex);
+        joins.set(partColIndex, "inner join \"PARTITION_KEY_VALS\" as \"FILTER" + partColIndex
+            + "\" on \"FILTER"  + partColIndex + "\".\"PART_ID\" = \"PARTITIONS\".\"PART_ID\""
+            + " and \"FILTER" + partColIndex + "\".\"INTEGER_IDX\" = " + partColIndex);
       }
 
-      String tableValue = "FILTER" + partColIndex + ".PART_KEY_VAL";
+      String tableValue = "\"FILTER" + partColIndex + "\".\"PART_KEY_VAL\"";
       // TODO: need casts here if #doesOperatorSupportIntegral is amended to include lt/gt/etc.
       filterBuffer.append(node.isReverseOrder
           ? "(? " + node.operator.getSqlOp() + " " + tableValue + ")"

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Tue Oct  1 04:48:44 2013
@@ -2115,6 +2115,7 @@ public class ObjectStore implements RawS
     // TODO: Drop table can be very slow on large tables, we might want to address this.
     return allowSql
       && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL)
+      && directSql.isCompatibleDatastore()
       && !isActiveTransaction();
   }
 

Modified: hive/branches/tez/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/build.xml?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/build.xml (original)
+++ hive/branches/tez/ql/build.xml Tue Oct  1 04:48:44 2013
@@ -190,7 +190,7 @@
     <echo message="Project: ${ant.project.name}"/>
     <javac
      encoding="${build.encoding}"
-     srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}"
+     srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}:${build.dir}/gen/vector"
      includes="**/*.java"
      destdir="${build.classes}"
      debug="${javac.debug}"

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java Tue Oct  1 04:48:44 2013
@@ -362,6 +362,7 @@ public enum ErrorMsg {
   UNSUPPORTED_ALTER_TBL_OP(10245, "{0} alter table options is not supported"),
   INVALID_BIGTABLE_MAPJOIN(10246, "{0} table chosen for streaming is not valid", true),
   MISSING_OVER_CLAUSE(10247, "Missing over clause for function : "),
+  PARTITION_SPEC_TYPE_MISMATCH(10248, "Cannot add partition column {0} of type {1} as it cannot be converted to type {2}", true),
 
   INVALID_HDFS_URI(10248, "{0} is not a hdfs uri", true),
   INVALID_DIR(10249, "{0} is not a directory", true),

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Oct  1 04:48:44 2013
@@ -373,6 +373,12 @@ public class FetchOperator implements Se
       job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
           .toString()));
 
+      // Fetch operator is not vectorized and as such turn vectorization flag off so that
+      // non-vectorized record reader is created below.
+      if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+        HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+      }
+
       PartitionDesc partDesc;
       if (currTbl == null) {
         partDesc = currPart;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Oct  1 04:48:44 2013
@@ -35,11 +35,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter.StatsProvidingRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveKey;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
-import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.io.HivePartitioner;
+import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
@@ -84,11 +86,13 @@ public class FileSinkOperator extends Te
   protected transient int dpStartCol; // start column # for DP columns
   protected transient List<String> dpVals; // array of values corresponding to DP columns
   protected transient List<Object> dpWritables;
-  protected transient RecordWriter[] rowOutWriters; // row specific RecordWriters
+  protected transient FSRecordWriter[] rowOutWriters; // row specific RecordWriters
   protected transient int maxPartitions;
   protected transient ListBucketingCtx lbCtx;
   protected transient boolean isSkewedStoredAsSubDirectories;
-  private transient boolean statsCollectRawDataSize;
+  protected transient boolean statsCollectRawDataSize;
+  private transient boolean[] statsFromRecordWriter;
+  private transient boolean isCollectRWStats;
 
 
   private static final transient String[] FATAL_ERR_MSG = {
@@ -96,22 +100,12 @@ public class FileSinkOperator extends Te
       "Number of dynamic partitions exceeded hive.exec.max.dynamic.partitions.pernode."
   };
 
-  /**
-   * RecordWriter.
-   *
-   */
-  public static interface RecordWriter {
-    void write(Writable w) throws IOException;
-
-    void close(boolean abort) throws IOException;
-  }
-
   public class FSPaths implements Cloneable {
     Path tmpPath;
     Path taskOutputTempPath;
     Path[] outPaths;
     Path[] finalPaths;
-    RecordWriter[] outWriters;
+    FSRecordWriter[] outWriters;
     Stat stat;
 
     public FSPaths() {
@@ -122,7 +116,7 @@ public class FileSinkOperator extends Te
       taskOutputTempPath = Utilities.toTaskTempPath(specPath);
       outPaths = new Path[numFiles];
       finalPaths = new Path[numFiles];
-      outWriters = new RecordWriter[numFiles];
+      outWriters = new FSRecordWriter[numFiles];
       stat = new Stat();
     }
 
@@ -166,11 +160,11 @@ public class FileSinkOperator extends Te
       }
     }
 
-    public void setOutWriters(RecordWriter[] out) {
+    public void setOutWriters(FSRecordWriter[] out) {
       outWriters = out;
     }
 
-    public RecordWriter[] getOutWriters() {
+    public FSRecordWriter[] getOutWriters() {
       return outWriters;
     }
 
@@ -221,6 +215,10 @@ public class FileSinkOperator extends Te
         }
       }
     }
+
+    public Stat getStat() {
+      return stat;
+    }
   } // class FSPaths
 
   private static final long serialVersionUID = 1L;
@@ -228,7 +226,7 @@ public class FileSinkOperator extends Te
   protected transient Serializer serializer;
   protected transient BytesWritable commonKey = new BytesWritable();
   protected transient TableIdEnum tabIdEnum = null;
-  private transient LongWritable row_count;
+  protected transient LongWritable row_count;
   private transient boolean isNativeTable = true;
 
   /**
@@ -237,17 +235,17 @@ public class FileSinkOperator extends Te
    * each reducer can write 10 files - this way we effectively get 1000 files.
    */
   private transient ExprNodeEvaluator[] partitionEval;
-  private transient int totalFiles;
+  protected transient int totalFiles;
   private transient int numFiles;
-  private transient boolean multiFileSpray;
-  private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+  protected transient boolean multiFileSpray;
+  protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
 
   private transient ObjectInspector[] partitionObjectInspectors;
-  private transient HivePartitioner<HiveKey, Object> prtner;
-  private transient final HiveKey key = new HiveKey();
+  protected transient HivePartitioner<HiveKey, Object> prtner;
+  protected transient final HiveKey key = new HiveKey();
   private transient Configuration hconf;
-  private transient FSPaths fsp;
-  private transient boolean bDynParts;
+  protected transient FSPaths fsp;
+  protected transient boolean bDynParts;
   private transient SubStructObjectInspector subSetOI;
   private transient int timeOut; // JT timeout in msec.
   private transient long lastProgressReport = System.currentTimeMillis();
@@ -279,7 +277,7 @@ public class FileSinkOperator extends Te
   Class<? extends Writable> outputClass;
   String taskId;
 
-  private boolean filesCreated = false;
+  protected boolean filesCreated = false;
 
   private void initializeSpecPath() {
     // For a query of the type:
@@ -324,6 +322,7 @@ public class FileSinkOperator extends Te
       isCompressed = conf.getCompressed();
       parent = Utilities.toTempPath(conf.getDirName());
       statsCollectRawDataSize = conf.isStatsCollectRawDataSize();
+      statsFromRecordWriter = new boolean[numFiles];
 
       serializer = (Serializer) conf.getTableInfo().getDeserializerClass().newInstance();
       serializer.initialize(null, conf.getTableInfo().getProperties());
@@ -432,7 +431,7 @@ public class FileSinkOperator extends Te
     }
   }
 
-  private void createBucketFiles(FSPaths fsp) throws HiveException {
+  protected void createBucketFiles(FSPaths fsp) throws HiveException {
     try {
       int filesIdx = 0;
       Set<Integer> seenBuckets = new HashSet<Integer>();
@@ -516,6 +515,8 @@ public class FileSinkOperator extends Te
         fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
             jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx],
             reporter);
+        // If the record writer provides stats, get it from there instead of the serde
+        statsFromRecordWriter[filesIdx] = fsp.outWriters[filesIdx] instanceof StatsProvidingRecordWriter;
         // increment the CREATED_FILES counter
         if (reporter != null) {
           reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
@@ -544,7 +545,7 @@ public class FileSinkOperator extends Te
    *
    * @return true if a new progress update is reported, false otherwise.
    */
-  private boolean updateProgress() {
+  protected boolean updateProgress() {
     if (reporter != null &&
         (System.currentTimeMillis() - lastProgressReport) > timeOut) {
       reporter.progress();
@@ -555,7 +556,7 @@ public class FileSinkOperator extends Te
     }
   }
 
-  Writable recordValue;
+  protected Writable recordValue;
 
   @Override
   public void processOp(Object row, int tag) throws HiveException {
@@ -619,7 +620,11 @@ public class FileSinkOperator extends Te
       }
 
       rowOutWriters = fpaths.outWriters;
-      if (conf.isGatherStats()) {
+      // check if all record writers implement statistics. if atleast one RW
+      // doesn't implement stats interface we will fallback to conventional way
+      // of gathering stats
+      isCollectRWStats = areAllTrue(statsFromRecordWriter);
+      if (conf.isGatherStats() && !isCollectRWStats) {
         if (statsCollectRawDataSize) {
           SerDeStats stats = serializer.getSerDeStats();
           if (stats != null) {
@@ -630,12 +635,14 @@ public class FileSinkOperator extends Te
       }
 
 
+      FSRecordWriter rowOutWriter = null;
+
       if (row_count != null) {
         row_count.set(row_count.get() + 1);
       }
 
       if (!multiFileSpray) {
-        rowOutWriters[0].write(recordValue);
+        rowOutWriter = rowOutWriters[0];
       } else {
         int keyHashCode = 0;
         for (int i = 0; i < partitionEval.length; i++) {
@@ -646,8 +653,9 @@ public class FileSinkOperator extends Te
         key.setHashCode(keyHashCode);
         int bucketNum = prtner.getBucket(key, null, totalFiles);
         int idx = bucketMap.get(bucketNum);
-        rowOutWriters[idx].write(recordValue);
+        rowOutWriter = rowOutWriters[idx];
       }
+      rowOutWriter.write(recordValue);
     } catch (IOException e) {
       throw new HiveException(e);
     } catch (SerDeException e) {
@@ -655,13 +663,22 @@ public class FileSinkOperator extends Te
     }
   }
 
+  private boolean areAllTrue(boolean[] statsFromRW) {
+    for(boolean b : statsFromRW) {
+      if (!b) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Lookup list bucketing path.
    * @param lbDirName
    * @return
    * @throws HiveException
    */
-  private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+  protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
     FSPaths fsp2 = valToPaths.get(lbDirName);
     if (fsp2 == null) {
       fsp2 = createNewPaths(lbDirName);
@@ -699,7 +716,7 @@ public class FileSinkOperator extends Te
    * @param row row to process.
    * @return directory name.
    */
-  private String generateListBucketingDirName(Object row) {
+  protected String generateListBucketingDirName(Object row) {
     if (!this.isSkewedStoredAsSubDirectories) {
       return null;
     }
@@ -740,7 +757,7 @@ public class FileSinkOperator extends Te
     return lbDirName;
   }
 
-  private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
+  protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
 
     FSPaths fp;
 
@@ -864,6 +881,27 @@ public class FileSinkOperator extends Te
     if (!abort) {
       for (FSPaths fsp : valToPaths.values()) {
         fsp.closeWriters(abort);
+
+        // before closing the operator check if statistics gathering is requested
+        // and is provided by record writer. this is different from the statistics
+        // gathering done in processOp(). In processOp(), for each row added
+        // serde statistics about the row is gathered and accumulated in hashmap.
+        // this adds more overhead to the actual processing of row. But if the
+        // record writer already gathers the statistics, it can simply return the
+        // accumulated statistics which will be aggregated in case of spray writers
+        if (conf.isGatherStats() && isCollectRWStats) {
+          for (int idx = 0; idx < fsp.outWriters.length; idx++) {
+            FSRecordWriter outWriter = fsp.outWriters[idx];
+            if (outWriter != null) {
+              SerDeStats stats = ((StatsProvidingRecordWriter) outWriter).getStats();
+              if (stats != null) {
+                fsp.stat.addToStat(StatsSetupConst.RAW_DATA_SIZE, stats.getRawDataSize());
+                fsp.stat.addToStat(StatsSetupConst.ROW_COUNT, stats.getRowCount());
+              }
+            }
+          }
+        }
+
         if (isNativeTable) {
           fsp.commit(fs);
         }
@@ -934,7 +972,7 @@ public class FileSinkOperator extends Te
                  hiveOutputFormat = ReflectionUtils.newInstance(conf.getTableInfo().getOutputFileFormatClass(),job);
            }
           else {
-                 hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance(); 
+                 hiveOutputFormat = conf.getTableInfo().getOutputFileFormatClass().newInstance();
           }
         }
         else {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Oct  1 04:48:44 2013
@@ -46,13 +46,14 @@ public class FilterOperator extends Oper
     FILTERED, PASSED
   }
 
-  private final transient LongWritable filtered_count, passed_count;
+  protected final transient LongWritable filtered_count;
+  protected final transient LongWritable passed_count;
   private transient ExprNodeEvaluator conditionEvaluator;
   private transient PrimitiveObjectInspector conditionInspector;
   private transient int consecutiveFails;
   private transient int consecutiveSearches;
   private transient IOContext ioContext;
-  transient int heartbeatInterval;
+  protected transient int heartbeatInterval;
 
   public FilterOperator() {
     super();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Tue Oct  1 04:48:44 2013
@@ -141,8 +141,16 @@ public class GroupByOperator extends Ope
   transient StructObjectInspector newKeyObjectInspector;
   transient StructObjectInspector currentKeyObjectInspector;
   public static MemoryMXBean memoryMXBean;
-  private long maxMemory;
-  private float memoryThreshold;
+
+  /**
+   * Total amount of memory allowed for JVM heap.
+   */
+  protected long maxMemory;
+
+  /**
+   * configure percent of memory threshold usable by QP.
+   */
+  protected float memoryThreshold;
 
   private boolean groupingSetsPresent;
   private int groupingSetsPosition;
@@ -159,10 +167,18 @@ public class GroupByOperator extends Ope
   transient List<Field>[] aggrPositions;
 
   transient int fixedRowSize;
-  transient long maxHashTblMemory;
+
+  /**
+   * Max memory usable by the hashtable before it should flush.
+   */
+  protected transient long maxHashTblMemory;
   transient int totalVariableSize;
   transient int numEntriesVarSize;
-  transient int numEntriesHashTable;
+
+  /**
+   * Current number of entries in the hash table.
+   */
+  protected transient int numEntriesHashTable;
   transient int countAfterReport;   // report or forward
   transient int heartbeatInterval;
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java Tue Oct  1 04:48:44 2013
@@ -22,9 +22,9 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 public abstract class KeyWrapper {
-  abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
-  abstract void setHashKey();
-  abstract KeyWrapper copyKey();
-  abstract void copyKey(KeyWrapper oldWrapper);
-  abstract Object[] getKeyArray();
+  public abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
+  public abstract void setHashKey();
+  public abstract KeyWrapper copyKey();
+  public abstract void copyKey(KeyWrapper oldWrapper);
+  public abstract Object[] getKeyArray();
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct  1 04:48:44 2013
@@ -103,7 +103,7 @@ public abstract class Operator<T extends
 
   protected transient State state = State.UNINIT;
 
-  static transient boolean fatalError = false; // fatalError is shared acorss
+  protected static transient boolean fatalError = false; // fatalError is shared acorss
   // all operators
 
   static {
@@ -1448,6 +1448,60 @@ public abstract class Operator<T extends
     return ret;
   }
 
+  /**
+   * Clones only the operator. The children and parent are set
+   * to null.
+   * @return Cloned operator
+   * @throws CloneNotSupportedException
+   */
+  public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
+    T descClone = (T) conf.clone();
+    Operator<? extends OperatorDesc> ret =
+        (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+            descClone, getSchema());
+    return ret;
+  }
+
+  /**
+   * Recursively clones all the children of the tree,
+   * Fixes the pointers to children, parents and the pointers to itself coming from the children.
+   * It does not fix the pointers to itself coming from parents, parents continue to point to
+   * the original child.
+   * @return Cloned operator
+   * @throws CloneNotSupportedException
+   */
+  public Operator<? extends OperatorDesc> cloneRecursiveChildren()
+      throws CloneNotSupportedException {
+    Operator<? extends OperatorDesc> newOp = this.cloneOp();
+    newOp.setParentOperators(this.parentOperators);
+    // Fix parent in all children
+    if (this.getChildOperators() == null) {
+      newOp.setChildOperators(null);
+      return newOp;
+    }
+    List<Operator<? extends OperatorDesc>> newChildren =
+        new ArrayList<Operator<? extends OperatorDesc>>();
+
+    for (Operator<? extends OperatorDesc> childOp : this.getChildOperators()) {
+      List<Operator<? extends OperatorDesc>> parentList =
+          new ArrayList<Operator<? extends OperatorDesc>>();
+      for (Operator<? extends OperatorDesc> parent : childOp.getParentOperators()) {
+        if (parent.equals(this)) {
+          parentList.add(newOp);
+        } else {
+          parentList.add(parent);
+        }
+      }
+      // Recursively clone the children
+      Operator<? extends OperatorDesc> clonedChildOp = childOp.cloneRecursiveChildren();
+      clonedChildOp.setParentOperators(parentList);
+    }
+
+    newOp.setChildOperators(newChildren);
+    return newOp;
+  }
+
+
   /*
    * True only for operators which produce atmost 1 output row per input
    * row to it. This will allow the output column names to be directly

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct  1 04:48:44 2013
@@ -22,12 +22,17 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
 import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.MuxDesc;
 import org.apache.hadoop.hive.ql.plan.DemuxDesc;
 import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
-import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.FilterDesc;
 import org.apache.hadoop.hive.ql.plan.ForwardDesc;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.La
 import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
 import org.apache.hadoop.hive.ql.plan.LimitDesc;
 import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -106,6 +112,38 @@ public final class OperatorFactory {
         MuxOperator.class));
   }
 
+  public static ArrayList<OpTuple> vectorOpvec;
+  static {
+    vectorOpvec = new ArrayList<OpTuple>();
+    vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
+    vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
+    vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
+        VectorReduceSinkOperator.class));
+    vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
+    vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
+  }
+
+  public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
+      VectorizationContext vContext) {
+    Class<T> descClass = (Class<T>) conf.getClass();
+    for (OpTuple o : vectorOpvec) {
+      if (o.descClass == descClass) {
+        try {
+          Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
+              VectorizationContext.class, OperatorDesc.class).newInstance(
+              vContext, conf);
+          op.initializeCounters();
+          return op;
+        } catch (Exception e) {
+          e.printStackTrace();
+          throw new RuntimeException(e);
+        }
+      }
+    }
+    throw new RuntimeException("No vector operator for descriptor class "
+        + descClass.getName());
+  }
+
   public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
 
     for (OpTuple o : opvec) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct  1 04:48:44 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.Text;
 
 /**
@@ -72,11 +73,11 @@ public class ReduceSinkOperator extends 
 
   // TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
   // ready
-  transient Serializer keySerializer;
-  transient boolean keyIsText;
-  transient Serializer valueSerializer;
+  protected transient Serializer keySerializer;
+  protected transient boolean keyIsText;
+  protected transient Serializer valueSerializer;
   transient int tag;
-  transient byte[] tagByte = new byte[1];
+  protected transient byte[] tagByte = new byte[1];
   transient protected int numDistributionKeys;
   transient protected int numDistinctExprs;
   transient String inputAlias;  // input alias of this RS for join (used for PPD)
@@ -163,12 +164,15 @@ public class ReduceSinkOperator extends 
   }
 
   transient InspectableObject tempInspectableObject = new InspectableObject();
-  transient HiveKey keyWritable = new HiveKey();
+  protected transient HiveKey keyWritable = new HiveKey();
+  protected transient Writable value;
 
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
+  protected transient Object[] cachedValues;
+  protected transient List<List<Integer>> distinctColIndices;
   /**
    * This two dimensional array holds key data and a corresponding Union object
    * which contains the tag identifying the aggregate expression for distinct columns.
@@ -183,13 +187,9 @@ public class ReduceSinkOperator extends 
    * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
    * see {@link ExprNodeColumnEvaluator}
    */
-  transient Object[][] cachedKeys;
-  transient Object[] cachedValues;
-  transient List<List<Integer>> distinctColIndices;
-
+  protected transient Object[][] cachedKeys;
   boolean firstRow;
-
-  transient Random random;
+  protected transient Random random;
 
   /**
    * Initializes array of ExprNodeEvaluator. Adds Union field for distinct

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/UnionOperator.java Tue Oct  1 04:48:44 2013
@@ -80,15 +80,20 @@ public class UnionOperator extends Opera
     for (int p = 0; p < parents; p++) {
       assert (parentFields[p].size() == columns);
       for (int c = 0; c < columns; c++) {
-        columnTypeResolvers[c].update(parentFields[p].get(c)
-            .getFieldObjectInspector());
+        if (!columnTypeResolvers[c].update(parentFields[p].get(c)
+            .getFieldObjectInspector())) {
+          // checked in SemanticAnalyzer. Should not happen
+          throw new HiveException("Incompatible types for union operator");
+        }
       }
     }
 
     ArrayList<ObjectInspector> outputFieldOIs = new ArrayList<ObjectInspector>(
         columns);
     for (int c = 0; c < columns; c++) {
-      outputFieldOIs.add(columnTypeResolvers[c].get());
+      // can be null for void type
+      ObjectInspector oi = columnTypeResolvers[c].get();
+      outputFieldOIs.add(oi == null ? parentFields[0].get(c).getFieldObjectInspector() : oi);
     }
 
     // create output row ObjectInspector

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct  1 04:48:44 2013
@@ -102,13 +102,13 @@ import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.Driver;
 import org.apache.hadoop.hive.ql.ErrorMsg;
 import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
 import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.exec.mr.MapRedTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
@@ -572,7 +572,7 @@ public final class Utilities {
     }
   }
 
-  private static Path getPlanPath(Configuration conf) {
+  public static Path getPlanPath(Configuration conf) {
     String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
     if (plan != null && !plan.isEmpty()) {
       return new Path(plan);
@@ -1697,7 +1697,7 @@ public final class Utilities {
 
     for (String p : paths) {
       Path path = new Path(p);
-      RecordWriter writer = HiveFileFormatUtils.getRecordWriter(
+      FSRecordWriter writer = HiveFileFormatUtils.getRecordWriter(
           jc, hiveOutputFormat, outputClass, isCompressed,
           tableInfo.getProperties(), path, reporter);
       writer.close(false);
@@ -2883,7 +2883,7 @@ public final class Utilities {
     Path newFilePath = new Path(newFile);
 
     String onefile = newPath.toString();
-    RecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
+    FSRecordWriter recWriter = outFileFormat.newInstance().getHiveRecordWriter(job, newFilePath,
         Text.class, false, props, null);
     if (dummyRow) {
       // empty files are omitted at CombineHiveInputFormat.

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Oct  1 04:48:44 2013
@@ -237,6 +237,7 @@ public class ExecDriver extends Task<Map
     ShimLoader.getHadoopShims().prepareJobOutput(job);
     //See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
     job.setOutputFormat(HiveOutputFormatImpl.class);
+
     job.setMapperClass(ExecMapper.class);
 
     job.setMapOutputKeyClass(HiveKey.class);
@@ -828,3 +829,4 @@ public class ExecDriver extends Task<Map
     }
   }
 }
+

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/PTFRowContainer.java Tue Oct  1 04:48:44 2013
@@ -28,8 +28,8 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
@@ -240,7 +240,7 @@ public class PTFRowContainer<Row extends
   }
 
 
-  private static class PTFRecordWriter implements RecordWriter {
+  private static class PTFRecordWriter implements FSRecordWriter {
     BytesWritable EMPTY_KEY = new BytesWritable();
 
     SequenceFile.Writer outStream;
@@ -262,7 +262,7 @@ public class PTFRowContainer<Row extends
     extends HiveSequenceFileOutputFormat<K,V> {
 
     @Override
-    public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+    public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
         Class<? extends Writable> valueClass, boolean isCompressed,
         Properties tableProperties, Progressable progress) throws IOException {
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Oct  1 04:48:44 2013
@@ -30,8 +30,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -105,7 +105,7 @@ public class RowContainer<ROW extends Li
   int acutalSplitNum = 0;
   int currentSplitPointer = 0;
   org.apache.hadoop.mapred.RecordReader rr = null; // record reader
-  RecordWriter rw = null;
+  FSRecordWriter rw = null;
   InputFormat<WritableComparable, Writable> inputFormat = null;
   InputSplit[] inputSplits = null;
   private ROW dummyRow = null;
@@ -531,7 +531,7 @@ public class RowContainer<ROW extends Li
 
   }
 
-  protected RecordWriter getRecordWriter() {
+  protected FSRecordWriter getRecordWriter() {
     return rw;
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveBinaryOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -24,7 +24,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -43,7 +42,7 @@ public class HiveBinaryOutputFormat<K ex
   /**
    * create the final out file, and output row by row. After one row is
    * appended, a configured row separator is appended
-   * 
+   *
    * @param jc
    *          the job configuration file
    * @param outPath
@@ -59,14 +58,14 @@ public class HiveBinaryOutputFormat<K ex
    * @return the RecordWriter
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
     FileSystem fs = outPath.getFileSystem(jc);
     final OutputStream outStream = fs.create(outPath);
 
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (r instanceof Text) {
           Text tr = (Text) r;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveFileFormatUtils.java Tue Oct  1 04:48:44 2013
@@ -32,7 +32,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat;
@@ -246,7 +245,7 @@ public final class HiveFileFormatUtils {
     return true;
   }
 
-  public static RecordWriter getHiveRecordWriter(JobConf jc,
+  public static FSRecordWriter getHiveRecordWriter(JobConf jc,
       TableDesc tableInfo, Class<? extends Writable> outputClass,
       FileSinkDesc conf, Path outPath, Reporter reporter) throws HiveException {
     boolean storagehandlerofhivepassthru = false;
@@ -287,7 +286,7 @@ public final class HiveFileFormatUtils {
     }
   }
 
-  public static RecordWriter getRecordWriter(JobConf jc,
+  public static FSRecordWriter getRecordWriter(JobConf jc,
       HiveOutputFormat<?, ?> hiveOutputFormat,
       final Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProp, Path outPath, Reporter reporter

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveIgnoreKeyTextOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -25,7 +25,6 @@ import java.util.Properties;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.exec.Utilities;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
@@ -39,7 +38,7 @@ import org.apache.hadoop.util.Progressab
 /**
  * HiveIgnoreKeyTextOutputFormat replaces key with null before feeding the <key,
  * value> to TextOutputFormat.RecordWriter.
- * 
+ *
  */
 public class HiveIgnoreKeyTextOutputFormat<K extends WritableComparable, V extends Writable>
     extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> {
@@ -47,7 +46,7 @@ public class HiveIgnoreKeyTextOutputForm
   /**
    * create the final out file, and output row by row. After one row is
    * appended, a configured row separator is appended
-   * 
+   *
    * @param jc
    *          the job configuration file
    * @param outPath
@@ -63,7 +62,7 @@ public class HiveIgnoreKeyTextOutputForm
    * @return the RecordWriter
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path outPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
     int rowSeparator = 0;
@@ -79,7 +78,7 @@ public class HiveIgnoreKeyTextOutputForm
     FileSystem fs = outPath.getFileSystem(jc);
     final OutputStream outStream = Utilities.createCompressedStream(jc, fs
         .create(outPath), isCompressed);
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (r instanceof Text) {
           Text tr = (Text) r;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveNullValueSequenceFileOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.NullWritable;
@@ -48,7 +47,7 @@ public class HiveNullValueSequenceFileOu
   private boolean keyIsText;
 
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
@@ -58,7 +57,7 @@ public class HiveNullValueSequenceFileOu
 
     keyWritable = new HiveKey();
     keyIsText = valueClass.equals(Text.class);
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         if (keyIsText) {
           Text text = (Text) r;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Properties;
 
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.OutputFormat;
@@ -58,7 +57,7 @@ public interface HiveOutputFormat<K, V> 
    *          progress used for status report
    * @return the RecordWriter for the output file
    */
-  RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       final Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException;
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -49,7 +49,7 @@ public class HivePassThroughOutputFormat
                                   "org.apache.hadoop.hive.ql.io.HivePassThroughOutputFormat";
 
   public static final String HIVE_PASSTHROUGH_STORAGEHANDLER_OF_JOBCONFKEY =
-                                 "hive.passthrough.storagehandler.of"; 
+                                 "hive.passthrough.storagehandler.of";
 
   public HivePassThroughOutputFormat() {
     //construct this class through ReflectionUtils from FileSinkOperator
@@ -99,7 +99,7 @@ public class HivePassThroughOutputFormat
   }
 
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
       JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
     if (this.initialized == false) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HivePassThroughRecordWriter.java Tue Oct  1 04:48:44 2013
@@ -20,13 +20,12 @@ package org.apache.hadoop.hive.ql.io;
 
 import java.io.IOException;
 
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 
 
 public class HivePassThroughRecordWriter <K extends WritableComparable<?>, V extends Writable>
-implements RecordWriter {
+implements FSRecordWriter {
 
   private final org.apache.hadoop.mapred.RecordWriter<K, V> mWriter;
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveSequenceFileOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -23,7 +23,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.SequenceFile;
@@ -56,7 +55,7 @@ public class HiveSequenceFileOutputForma
    * @return the RecordWriter for the output file
    */
   @Override
-  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+  public FSRecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
       Class<? extends Writable> valueClass, boolean isCompressed,
       Properties tableProperties, Progressable progress) throws IOException {
 
@@ -64,7 +63,7 @@ public class HiveSequenceFileOutputForma
     final SequenceFile.Writer outStream = Utilities.createSequenceWriter(jc,
         fs, finalOutPath, BytesWritable.class, valueClass, isCompressed);
 
-    return new RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         outStream.append(EMPTY_KEY, r);
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/RCFileOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -118,7 +118,7 @@ public class RCFileOutputFormat extends
    * @throws IOException
    */
   @Override
-  public org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter getHiveRecordWriter(
+  public FSRecordWriter getHiveRecordWriter(
       JobConf jc, Path finalOutPath, Class<? extends Writable> valueClass,
       boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException {
 
@@ -135,7 +135,7 @@ public class RCFileOutputFormat extends
       (jc, finalOutPath.getFileSystem(jc),
        finalOutPath, isCompressed);
 
-    return new org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter() {
+    return new FSRecordWriter() {
       public void write(Writable r) throws IOException {
         outWriter.append(r);
       }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroContainerOutputFormat.java Tue Oct  1 04:48:44 2013
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.io.avro;
 
+import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
+import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
+import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
+
+import java.io.IOException;
+import java.util.Properties;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
@@ -24,7 +32,7 @@ import org.apache.avro.generic.GenericDa
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.hive.serde2.avro.AvroSerdeException;
@@ -36,14 +44,6 @@ import org.apache.hadoop.mapred.RecordWr
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.Progressable;
 
-import java.io.IOException;
-import java.util.Properties;
-
-import static org.apache.avro.file.DataFileConstants.DEFLATE_CODEC;
-import static org.apache.avro.mapred.AvroJob.OUTPUT_CODEC;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFAULT_DEFLATE_LEVEL;
-import static org.apache.avro.mapred.AvroOutputFormat.DEFLATE_LEVEL_KEY;
-
 /**
  * Write to an Avro file from a Hive process.
  */
@@ -51,7 +51,7 @@ public class AvroContainerOutputFormat
         implements HiveOutputFormat<LongWritable, AvroGenericRecordWritable> {
 
   @Override
-  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jobConf,
+  public FSRecordWriter getHiveRecordWriter(JobConf jobConf,
          Path path, Class<? extends Writable> valueClass, boolean isCompressed,
          Properties properties, Progressable progressable) throws IOException {
     Schema schema;
@@ -62,7 +62,7 @@ public class AvroContainerOutputFormat
     }
     GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(schema);
     DataFileWriter<GenericRecord> dfw = new DataFileWriter<GenericRecord>(gdw);
-    
+
     if (isCompressed) {
       int level = jobConf.getInt(DEFLATE_LEVEL_KEY, DEFAULT_DEFLATE_LEVEL);
       String codecName = jobConf.get(OUTPUT_CODEC, DEFLATE_CODEC);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/avro/AvroGenericRecordWriter.java Tue Oct  1 04:48:44 2013
@@ -18,18 +18,18 @@
 package org.apache.hadoop.hive.ql.io.avro;
 
 
+import java.io.IOException;
+
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.FSRecordWriter;
 import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
 import org.apache.hadoop.io.Writable;
 
-import java.io.IOException;
-
 /**
  * Write an Avro GenericRecord to an Avro data file.
  */
-public class AvroGenericRecordWriter implements FileSinkOperator.RecordWriter{
+public class AvroGenericRecordWriter implements FSRecordWriter{
   final private DataFileWriter<GenericRecord> dfw;
 
   public AvroGenericRecordWriter(DataFileWriter<GenericRecord> dfw) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Oct  1 04:48:44 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.io.orc
 import java.io.EOFException;
 import java.io.IOException;
 
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
 class BitFieldReader {
-  private RunLengthByteReader input;
+  private final RunLengthByteReader input;
   private final int bitSize;
   private int current;
   private int bitsLeft;
@@ -60,6 +62,30 @@ class BitFieldReader {
     return result & mask;
   }
 
+  void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException {
+
+    previous.isRepeating = true;
+    for (int i = 0; i < previousLen; i++) {
+      if (!previous.isNull[i]) {
+        previous.vector[i] = next();
+      } else {
+        // The default value of null for int types in vectorized
+        // processing is 1, so set that if the value is null
+        previous.vector[i] = 1;
+      }
+
+      // The default value for nulls in Vectorization for int types is 1
+      // and given that non null value can also be 1, we need to check for isNull also
+      // when determining the isRepeating flag.
+      if (previous.isRepeating
+          && i > 0
+          && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+        previous.isRepeating = false;
+      }
+    }
+  }
+
   void seek(PositionProvider index) throws IOException {
     input.seek(index);
     int consumed = (int) index.getNext();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Oct  1 04:48:44 2013
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
-import org.apache.hadoop.io.Text;
-
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.hadoop.io.Text;
+
 /**
  * A class that is a growable array of bytes. Growth is managed in terms of
  * chunks that are allocated when needed.
@@ -237,6 +237,7 @@ final class DynamicByteArray {
     }
   }
 
+  @Override
   public String toString() {
     int i;
     StringBuilder sb = new StringBuilder(length * 3);
@@ -268,10 +269,35 @@ final class DynamicByteArray {
   }
 
   /**
+   * Gets all the bytes of the array.
+   *
+   * @return Bytes of the array
+   */
+  public byte[] get() {
+    byte[] result = null;
+    if (length > 0) {
+      int currentChunk = 0;
+      int currentOffset = 0;
+      int currentLength = Math.min(length, chunkSize);
+      int destOffset = 0;
+      result = new byte[length];
+      int totalLength = length;
+      while (totalLength > 0) {
+        System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+        destOffset += currentLength;
+        totalLength -= currentLength;
+        currentChunk += 1;
+        currentOffset = 0;
+        currentLength = Math.min(totalLength, chunkSize - currentOffset);
+      }
+    }
+    return result;
+  }
+
+  /**
    * Get the size of the buffers.
    */
   public long getSizeInBytes() {
     return initializedChunks * chunkSize;
   }
 }
-

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Oct  1 04:48:44 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.ql.io.orc;
 
 import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
 
 /**
  * Interface for reading integers.
@@ -52,4 +53,12 @@ interface IntegerReader {
    * @throws IOException
    */
   long next() throws IOException;
+
+  /**
+   * Return the next available vector for values.
+   * @return
+   * @throws IOException
+   */
+   void nextVector(LongColumnVector previous, long previousLen)
+      throws IOException;
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1527883&r1=1527882&r2=1527883&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Oct  1 04:48:44 2013
@@ -27,7 +27,6 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -38,6 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -54,12 +55,13 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RecordReader;
 import org.apache.hadoop.mapred.Reporter;
 import org.apache.hadoop.util.StringUtils;
-
 /**
  * A MapReduce/Hive input format for ORC files.
  */
-public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
-                                       InputFormatChecker {
+public class OrcInputFormat  implements InputFormat<NullWritable, OrcStruct>,
+  InputFormatChecker, VectorizedInputFormatInterface {
+
+  VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
 
   private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
   static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
@@ -85,6 +87,7 @@ public class OrcInputFormat implements I
     private final int numColumns;
     private float progress = 0.0f;
 
+
     OrcRecordReader(Reader file, Configuration conf,
                     long offset, long length) throws IOException {
       String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
@@ -219,6 +222,12 @@ public class OrcInputFormat implements I
   public RecordReader<NullWritable, OrcStruct>
       getRecordReader(InputSplit inputSplit, JobConf conf,
                       Reporter reporter) throws IOException {
+    if (isVectorMode(conf)) {
+      RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
+          reporter);
+      return (RecordReader) vorr;
+    }
+
     FileSplit fileSplit = (FileSplit) inputSplit;
     Path path = fileSplit.getPath();
     FileSystem fs = path.getFileSystem(conf);
@@ -231,6 +240,11 @@ public class OrcInputFormat implements I
   public boolean validateInput(FileSystem fs, HiveConf conf,
                                ArrayList<FileStatus> files
                               ) throws IOException {
+
+    if (isVectorMode(conf)) {
+      return voif.validateInput(fs, conf, files);
+    }
+
     if (files.size() <= 0) {
       return false;
     }
@@ -244,6 +258,14 @@ public class OrcInputFormat implements I
     return true;
   }
 
+  private boolean isVectorMode(Configuration conf) {
+    if (Utilities.getPlanPath(conf) != null && Utilities
+        .getMapRedWork(conf).getMapWork().getVectorMode()) {
+      return true;
+    }
+    return false;
+  }
+
   /**
    * Get the list of input {@link Path}s for the map-reduce job.
    *