You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/30 21:46:17 UTC

svn commit: r1519056 [2/3] - in /hive/branches/tez: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ hbase-handler/src/test/queries/positive/ hbase-handler/src/test/results/positive/ hcatalog/ hcatalog/build-support/ant/ hcatalog/core/ hcatalog/co...

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Fri Aug 30 19:46:15 2013
@@ -79,23 +79,39 @@ class MetaStoreDirectSql {
   public List<Partition> getPartitionsViaSqlFilter(
       String dbName, String tblName, List<String> partNames) throws MetaException {
     String list = repeat(",?", partNames.size()).substring(1);
-    return getPartitionsViaSqlFilterInternal(dbName, tblName,
-        "and PARTITIONS.PART_NAME in (" + list + ")" , partNames, new ArrayList<String>());
+    return getPartitionsViaSqlFilterInternal(dbName, tblName, null,
+        "and PARTITIONS.PART_NAME in (" + list + ")", partNames, new ArrayList<String>());
   }
 
   /**
    * Gets partitions by using direct SQL queries.
-   * @param dbName Metastore db name.
-   * @param tblName Metastore table name.
+   * @param table The table.
    * @param parser The parsed filter from which the SQL filter will be generated.
    * @return List of partitions.
    */
-  public List<Partition> getPartitionsViaSqlFilter(Table table, String dbName,
-      String tblName, FilterParser parser) throws MetaException {
+  public List<Partition> getPartitionsViaSqlFilter(
+      Table table, FilterParser parser) throws MetaException {
     List<String> params = new ArrayList<String>(), joins = new ArrayList<String>();
     String sqlFilter = (parser == null) ? null
         : PartitionFilterGenerator.generateSqlFilter(table, parser.tree, params, joins);
-    return getPartitionsViaSqlFilterInternal(dbName, tblName, sqlFilter, params, joins);
+    return getPartitionsViaSqlFilterInternal(table.getDbName(), table.getTableName(),
+        isViewTable(table), sqlFilter, params, joins);
+  }
+
+  private static Boolean isViewTable(Table t) {
+    return t.isSetTableType() ?
+        t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null;
+  }
+
+  private boolean isViewTable(String dbName, String tblName) throws MetaException {
+    String queryText = "select TBL_TYPE from TBLS" +
+        " inner join DBS on TBLS.DB_ID = DBS.DB_ID " +
+        " where TBLS.TBL_NAME = ? and DBS.NAME = ?";
+    Object[] params = new Object[] { tblName, dbName };
+    Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+    query.setUnique(true);
+    Object result = query.executeWithArray(params);
+    return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString());
   }
 
   /**
@@ -103,14 +119,16 @@ class MetaStoreDirectSql {
    * queries created by DN retrieving stuff for each object individually.
    * @param dbName Metastore db name.
    * @param tblName Metastore table name.
+   * @param isView Whether table is a view. Can be passed as null if not immediately
+   *               known, then this method will get it only if necessary.
    * @param sqlFilter SQL filter to use. Better be SQL92-compliant. Can be null.
    * @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
    * @param joinsForFilter if the filter needs additional join statement, they must be in
    *                       this list. Better be SQL92-compliant.
    * @return List of partition objects. FieldSchema is currently not populated.
    */
-  private List<Partition> getPartitionsViaSqlFilterInternal(String dbName,
-      String tblName, String sqlFilter, List<String> paramsForFilter,
+  private List<Partition> getPartitionsViaSqlFilterInternal(String dbName, String tblName,
+      Boolean isView, String sqlFilter, List<String> paramsForFilter,
       List<String> joinsForFilter) throws MetaException {
     boolean doTrace = LOG.isDebugEnabled();
     // Get all simple fields for partitions and related objects, which we can map one-on-one.
@@ -191,9 +209,15 @@ class MetaStoreDirectSql {
       Long sdId = (Long)fields[1];
       Long colId = (Long)fields[2];
       Long serdeId = (Long)fields[3];
+      // A partition must have either everything set, or nothing set if it's a view.
       if (sdId == null || colId == null || serdeId == null) {
-        throw new MetaException("Unexpected null for one of the IDs, SD " + sdId
-            + ", column " + colId + ", serde " + serdeId);
+        if (isView == null) {
+          isView = isViewTable(dbName, tblName);
+        }
+        if ((sdId != null || colId != null || serdeId != null) || !isView) {
+          throw new MetaException("Unexpected null for one of the IDs, SD " + sdId + ", column "
+              + colId + ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
+        }
       }
 
       Partition part = new Partition();
@@ -207,6 +231,9 @@ class MetaStoreDirectSql {
       if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
       partitions.put(partitionId, part);
 
+      if (sdId == null) continue; // Probably a view.
+      assert colId != null && serdeId != null;
+
       // We assume each partition has an unique SD.
       StorageDescriptor sd = new StorageDescriptor();
       StorageDescriptor oldSd = sds.put(sdId, sd);
@@ -257,10 +284,6 @@ class MetaStoreDirectSql {
           (System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
     }
 
-    // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
-    String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
-        colIds = trimCommaList(colsSb);
-
     // Now get all the one-to-many things. Start with partitions.
     queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
         + partIds + ") and PARAM_KEY is not null order by PART_ID asc";
@@ -276,6 +299,14 @@ class MetaStoreDirectSql {
         t.addToValues((String)fields[1]);
       }});
 
+    // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
+    if (sdSb.length() == 0) {
+      assert serdeSb.length() == 0 && colsSb.length() == 0;
+      return orderedResult; // No SDs, probably a view.
+    }
+    String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
+        colIds = trimCommaList(colsSb);
+
     // Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
     queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
         + sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
@@ -341,7 +372,7 @@ class MetaStoreDirectSql {
             if (currentListId == null || fieldsListId != currentListId) {
               currentList = new ArrayList<String>();
               currentListId = fieldsListId;
-              t.getSkewedInfo().addToSkewedColValues(currentList); // TODO#: here
+              t.getSkewedInfo().addToSkewedColValues(currentList);
             }
             currentList.add((String)fields[2]);
           }

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Fri Aug 30 19:46:15 2013
@@ -1659,9 +1659,17 @@ public class ObjectStore implements RawS
   @Override
   public List<Partition> getPartitionsByNames(String dbName, String tblName,
       List<String> partNames) throws MetaException, NoSuchObjectException {
+    return getPartitionsByNamesInternal(dbName, tblName, partNames, true, true);
+  }
+
+  protected List<Partition> getPartitionsByNamesInternal(String dbName, String tblName,
+      List<String> partNames, boolean allowSql, boolean allowJdo)
+          throws MetaException, NoSuchObjectException {
+    assert allowSql || allowJdo;
     boolean doTrace = LOG.isDebugEnabled();
     List<Partition> results = null;
-    boolean doUseDirectSql = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
+    boolean doUseDirectSql = allowSql
+        && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
 
     boolean success = false;
     try {
@@ -1671,7 +1679,13 @@ public class ObjectStore implements RawS
         try {
           results = directSql.getPartitionsViaSqlFilter(dbName, tblName, partNames);
         } catch (Exception ex) {
-          LOG.error("Direct SQL failed, falling back to ORM", ex);
+          LOG.error("Direct SQL failed" + (allowJdo ? ", falling back to ORM" : ""), ex);
+          if (!allowJdo) {
+            if (ex instanceof MetaException) {
+              throw (MetaException)ex;
+            }
+            throw new MetaException(ex.getMessage());
+          }
           doUseDirectSql = false;
           rollbackTransaction();
           start = doTrace ? System.nanoTime() : 0;
@@ -1734,9 +1748,16 @@ public class ObjectStore implements RawS
   @Override
   public List<Partition> getPartitionsByFilter(String dbName, String tblName,
       String filter, short maxParts) throws MetaException, NoSuchObjectException {
+    return getPartitionsByFilterInternal(dbName, tblName, filter, maxParts, true, true);
+  }
+
+  protected List<Partition> getPartitionsByFilterInternal(String dbName, String tblName,
+      String filter, short maxParts, boolean allowSql, boolean allowJdo)
+      throws MetaException, NoSuchObjectException {
+    assert allowSql || allowJdo;
     boolean doTrace = LOG.isDebugEnabled();
     // There's no portable SQL limit. It doesn't make a lot of sense w/o offset anyway.
-    boolean doUseDirectSql = (maxParts < 0)
+    boolean doUseDirectSql = allowSql && (maxParts < 0)
         && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
     dbName = dbName.toLowerCase();
     tblName = tblName.toLowerCase();
@@ -1755,9 +1776,15 @@ public class ObjectStore implements RawS
       if (doUseDirectSql) {
         try {
           Table table = convertToTable(mtable);
-          results = directSql.getPartitionsViaSqlFilter(table, dbName, tblName, parser);
+          results = directSql.getPartitionsViaSqlFilter(table, parser);
         } catch (Exception ex) {
-          LOG.error("Direct SQL failed, falling back to ORM", ex);
+          LOG.error("Direct SQL failed" + (allowJdo ? ", falling back to ORM" : ""), ex);
+          if (!allowJdo) {
+            if (ex instanceof MetaException) {
+              throw (MetaException)ex;
+            }
+            throw new MetaException(ex.getMessage());
+          }
           doUseDirectSql = false;
           rollbackTransaction();
           start = doTrace ? System.nanoTime() : 0;

Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java Fri Aug 30 19:46:15 2013
@@ -23,7 +23,9 @@ import java.lang.reflect.InvocationTarge
 import java.lang.reflect.Method;
 import java.lang.reflect.Proxy;
 import java.lang.reflect.UndeclaredThrowableException;
+import java.util.List;
 
+import org.apache.commons.lang.ClassUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -70,8 +72,19 @@ public class RetryingRawStore implements
 
     RetryingRawStore handler = new RetryingRawStore(hiveConf, conf, baseClass, id);
 
-    return (RawStore) Proxy.newProxyInstance(RetryingRawStore.class.getClassLoader()
-        , baseClass.getInterfaces(), handler);
+    // Look for interfaces on both the class and all base classes.
+    return (RawStore) Proxy.newProxyInstance(RetryingRawStore.class.getClassLoader(),
+        getAllInterfaces(baseClass), handler);
+  }
+
+  private static Class<?>[] getAllInterfaces(Class<?> baseClass) {
+    List interfaces = ClassUtils.getAllInterfaces(baseClass);
+    Class<?>[] result = new Class<?>[interfaces.size()];
+    int i = 0;
+    for (Object o : interfaces) {
+      result[i++] = (Class<?>)o;
+    }
+    return result;
   }
 
   private void init() throws MetaException {

Modified: hive/branches/tez/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/build.xml?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/build.xml (original)
+++ hive/branches/tez/ql/build.xml Fri Aug 30 19:46:15 2013
@@ -250,15 +250,24 @@
         <exclude name="META-INF/MANIFEST.MF"/>
       </patternset>
     </unzip>
-    <unzip 
-      src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar" 
+    <unzip
+      src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
       dest="${build.dir.hive}/protobuf-java/classes">
       <patternset>
         <exclude name="META-INF"/>
         <exclude name="META-INF/MANIFEST.MF"/>
       </patternset>
     </unzip>
-    <unzip 
+    <unzip
+      src="${build.ivy.lib.dir}/default/guava-${guava.version}.jar"
+      dest="${build.dir.hive}/guava/classes">
+      <patternset>
+        <exclude name="META-INF"/>
+        <exclude name="META-INF/MANIFEST.MF"/>
+      </patternset>
+    </unzip>
+
+    <unzip
       src="${build.ivy.lib.dir}/default/snappy-${snappy.version}.jar" 
       dest="${build.dir.hive}/snappy/classes">
       <patternset>
@@ -296,14 +305,11 @@
       <fileset dir="${build.dir.hive}/shims/classes" includes="**/*.class"/>
       <fileset dir="${build.dir.hive}/javaewah/classes" includes="**/*.class"/>
       <fileset dir="${build.dir.hive}/javolution/classes" includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/protobuf-java/classes" 
-               includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/snappy/classes" 
-               includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/jackson-core-asl/classes"
-      	       includes="**/*.class"/>
-      <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes"
-                 includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/protobuf-java/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/snappy/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/jackson-core-asl/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes" includes="**/*.class"/>
+      <fileset dir="${build.dir.hive}/guava/classes" includes="**/*.class"/>
       <manifest>
         <!-- Not putting these in their own manifest section, since that inserts
              a new-line, which breaks the reading of the attributes. -->

Modified: hive/branches/tez/ql/ivy.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/ivy.xml?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/ivy.xml (original)
+++ hive/branches/tez/ql/ivy.xml Fri Aug 30 19:46:15 2013
@@ -61,8 +61,6 @@
     <dependency org="org.apache.hadoop" name="hadoop-yarn-client" rev="${hadoop-0.23.version}" 
                 conf="compile->master" transitive="false" />
 
-
-    <!-- hadoop specific guava -->
     <dependency org="org.json" name="json" rev="${json.version}"/>
     <dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"/>
     <dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
@@ -95,6 +93,8 @@
       <exclude org="org.apache.commons" module="commons-daemon"/><!--bad POM-->
     </dependency>
 
+    <dependency org="com.google.guava" name="guava" rev="${guava.version}" transitive="false"/>
+
     <!-- Test Dependencies -->
     <dependency org="junit" name="junit" rev="${junit.version}" conf="test->default" />
     

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Fri Aug 30 19:46:15 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
     return OperatorType.EXTRACT;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Fri Aug 30 19:46:15 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
     return OperatorType.FORWARD;
   }
 
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Aug 30 19:46:15 2013
@@ -1181,4 +1181,15 @@ public class GroupByOperator extends Ope
   public OperatorType getType() {
     return OperatorType.GROUPBY;
   }
+
+  /**
+   * we can push the limit above GBY (running in Reducer), since that will generate single row
+   * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+   * so we don't push limit above it.
+   */
+  @Override
+  public boolean acceptLimitPushdown() {
+    return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+        getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Aug 30 19:46:15 2013
@@ -92,6 +92,12 @@ public class HashTableSinkOperator exten
 
   private transient MapJoinTableContainer[] mapJoinTables;
   private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;  
+
+  private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+  private static final MapJoinRowContainer EMPTY_ROW_CONTAINER = new MapJoinRowContainer();
+  static {
+    EMPTY_ROW_CONTAINER.add(EMPTY_OBJECT_ARRAY);
+  }
   
   private transient boolean noOuterJoin;
 
@@ -223,19 +229,30 @@ public class HashTableSinkOperator exten
     // compute keys and values as StandardObjects
     MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
         joinKeysObjectInspectors[alias]);
-    Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+    Object[] value = EMPTY_OBJECT_ARRAY;
+    if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) {
+      value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
         joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
         filterMaps == null ? null : filterMaps[alias]);
+    }
     MapJoinTableContainer tableContainer = mapJoinTables[alias];
     MapJoinRowContainer rowContainer = tableContainer.get(key);
     if (rowContainer == null) {
-      rowContainer = new MapJoinRowContainer();
-      rowContainer.add(value);
+      if(value.length != 0) {
+        rowContainer = new MapJoinRowContainer();
+        rowContainer.add(value);
+      } else {
+        rowContainer = EMPTY_ROW_CONTAINER;
+      }
       rowNumber++;
       if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
         memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
       }
       tableContainer.put(key, rowContainer);
+    } else if (rowContainer == EMPTY_ROW_CONTAINER) {
+      rowContainer = rowContainer.copy();
+      rowContainer.add(value);
+      tableContainer.put(key, rowContainer);
     } else {
       rowContainer.add(value);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Aug 30 19:46:15 2013
@@ -602,6 +602,9 @@ public abstract class Operator<T extends
     state = State.CLOSE;
     LOG.info(id + " finished. closing... ");
 
+    // call the operator specific close routine
+    closeOp(abort);
+
     if (counterNameToEnum != null) {
       incrCounter(numInputRowsCntr, inputRows);
       incrCounter(numOutputRowsCntr, outputRows);
@@ -610,9 +613,6 @@ public abstract class Operator<T extends
 
     LOG.info(id + " forwarded " + cntr + " rows");
 
-    // call the operator specific close routine
-    closeOp(abort);
-
     try {
       logStats();
       if (childOperators == null) {
@@ -826,13 +826,7 @@ public abstract class Operator<T extends
       }
     }
 
-    if (isLogInfoEnabled) {
-      cntr++;
-      if (cntr == nextCntr) {
-        LOG.info(id + " forwarding " + cntr + " rows");
-        nextCntr = getNextCntr(cntr);
-      }
-    }
+    increaseForward(1);
 
     // For debugging purposes:
     // System.out.println("" + this.getClass() + ": " +
@@ -865,6 +859,18 @@ public abstract class Operator<T extends
     }
   }
 
+  void increaseForward(long counter) {
+    if (isLogInfoEnabled) {
+      cntr += counter;
+      if (cntr >= nextCntr) {
+        LOG.info(id + " forwarding " + cntr + " rows");
+        do {
+          nextCntr = getNextCntr(nextCntr);
+        } while(cntr >= nextCntr);
+      }
+    }
+  }
+
   public void resetStats() {
     for (Enum<?> e : statsMap.keySet()) {
       statsMap.get(e).set(0L);
@@ -1525,6 +1531,17 @@ public abstract class Operator<T extends
     return stats;
   }
 
+  /**
+   * used for LimitPushdownOptimizer
+   *
+   * if all of the operators between limit and reduce-sink does not remove any input rows
+   * in the range of limit count, limit can be pushed down to reduce-sink operator.
+   * forward, select, etc.
+   */
+  public boolean acceptLimitPushdown() {
+    return false;
+  }
+
   @Override
   public String toString() {
     return getName() + "[" + getIdentifier() + "]";

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Fri Aug 30 19:46:15 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 	PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
 	 * 4. Create input partition to store rows coming from previous operator
 	 */
 	@Override
-	protected void initializeOp(Configuration jobConf) throws HiveException
-	{
+	protected void initializeOp(Configuration jobConf) throws HiveException {
 		hiveConf = new HiveConf(jobConf, PTFOperator.class);
 		// if the parent is ExtractOperator, this invocation is from reduce-side
 		Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
     inputPart = createFirstPartitionForChain(
         inputObjInspectors[0], hiveConf, isMapOperator);
 
-		if (isMapOperator)
-		{
+		if (isMapOperator) {
 			PartitionedTableFunctionDef tDef = conf.getStartOfChain();
 			outputObjInspector = tDef.getRawInputShape().getOI();
-		}
-		else
-		{
+		} else {
 			outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
 		}
 
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
 	}
 
 	@Override
-	protected void closeOp(boolean abort) throws HiveException
-	{
+	protected void closeOp(boolean abort) throws HiveException {
 		super.closeOp(abort);
     if(inputPart.size() != 0){
-      if (isMapOperator)
-      {
+      if (isMapOperator) {
         processMapFunction();
-      }
-      else
-      {
+      } else {
         processInputPartition();
       }
     }
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
 	@Override
 	public void processOp(Object row, int tag) throws HiveException
 	{
-	  if (!isMapOperator )
-    {
+	  if (!isMapOperator ) {
       /*
        * checkif current row belongs to the current accumulated Partition:
        * - If not:
@@ -126,20 +113,15 @@ public class PTFOperator extends Operato
       boolean keysAreEqual = (currentKeys != null && newKeys != null)?
               newKeys.equals(currentKeys) : false;
 
-      if (currentKeys != null && !keysAreEqual)
-      {
+      if (currentKeys != null && !keysAreEqual) {
         processInputPartition();
         inputPart.reset();
       }
 
-      if (currentKeys == null || !keysAreEqual)
-      {
-        if (currentKeys == null)
-        {
+      if (currentKeys == null || !keysAreEqual) {
+        if (currentKeys == null) {
           currentKeys = newKeys.copyKey();
-        }
-        else
-        {
+        } else {
           currentKeys.copyKey(newKeys);
         }
       }
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
 	 * @param hiveConf
 	 * @throws HiveException
 	 */
-	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
-	{
+	protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
 
 	  PTFDeserializer dS =
 	      new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
 	  dS.initializePTFChain(conf.getFuncDef());
 	}
 
-	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
-	{
+	protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
 		PartitionDef pDef = conf.getStartOfChain().getPartition();
 		ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
 		int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
 		ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
 		ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
 
-		for(int i=0; i<numExprs; i++)
-		{
+		for(int i=0; i<numExprs; i++) {
 		  PTFExpressionDef exprDef = exprs.get(i);
 			/*
 			 * Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
 	  newKeys = keyWrapperFactory.getKeyWrapper();
 	}
 
-	protected void processInputPartition() throws HiveException
-	{
+	protected void processInputPartition() throws HiveException {
 	  PTFPartition outPart = executeChain(inputPart);
-	  if ( conf.forWindowing() ) {
-	    executeWindowExprs(outPart);
-	  }
-	  else {
-	    PTFPartitionIterator<Object> pItr = outPart.iterator();
-	    while (pItr.hasNext())
-	    {
-	      Object oRow = pItr.next();
-	      forward(oRow, outputObjInspector);
-	    }
-	  }
+	  PTFPartitionIterator<Object> pItr = outPart.iterator();
+    while (pItr.hasNext()) {
+      Object oRow = pItr.next();
+      forward(oRow, outputObjInspector);
+    }
 	}
 
-	protected void processMapFunction() throws HiveException
-	{
+	protected void processMapFunction() throws HiveException {
 	  PartitionedTableFunctionDef tDef = conf.getStartOfChain();
     PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
     PTFPartitionIterator<Object> pItr = outPart.iterator();
-    while (pItr.hasNext())
-    {
+    while (pItr.hasNext()) {
       Object oRow = pItr.next();
       forward(oRow, outputObjInspector);
     }
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
 
 
 	@Override
-	public OperatorType getType()
-	{
+	public OperatorType getType() {
 		return OperatorType.PTF;
 	}
 
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   private PTFPartition executeChain(PTFPartition part)
-      throws HiveException
-  {
+      throws HiveException {
     Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
     PTFInputDef iDef = conf.getFuncDef();
-    while (true)
-    {
-      if (iDef instanceof PartitionedTableFunctionDef)
-      {
-        fnDefs.push((PartitionedTableFunctionDef) iDef);
-        iDef = ((PartitionedTableFunctionDef) iDef).getInput();
-      }
-      else
-      {
-        break;
-      }
+
+    while (iDef instanceof PartitionedTableFunctionDef) {
+      fnDefs.push((PartitionedTableFunctionDef) iDef);
+      iDef = ((PartitionedTableFunctionDef) iDef).getInput();
     }
 
     PartitionedTableFunctionDef currFnDef;
-    while (!fnDefs.isEmpty())
-    {
+    while (!fnDefs.isEmpty()) {
       currFnDef = fnDefs.pop();
       part = currFnDef.getTFunction().execute(part);
     }
     return part;
   }
 
-  /**
-   * If WindowingSpec contains any Windowing Expressions or has a
-   * Having condition then these are processed
-   * and forwarded on. Whereas when there is no having or WdwExprs
-   * just forward the rows in the output Partition.
-   *
-   * For e.g. Consider the following query:
-   * <pre>
-   * {@code
-   *  select rank(), lead(rank(),1),...
-   *  from xyz
-   *  ...
-   *  having rank() > 1
-   *  }
-   * </pre>
-   * rank() gets processed as a WdwFn; Its in the oPart(output partition)
-   * argument to executeWindowExprs. Here we first evaluate the having expression.
-   * So the first row of oPart gets filtered out.
-   * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
-   *
-   * @param ptfDesc
-   * @param oPart output partition after Window Fns are processed.
-   * @param op
-   * @throws HiveException
-   */
-  private void executeWindowExprs(PTFPartition oPart)
-      throws HiveException
-  {
-    WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
-    /*
-     * inputOI represents the row with WindowFn results present.
-     * So in the e.g. above it will have a column for 'rank()'
-     */
-    StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
-    /*
-     * outputOI represents the final row with the Windowing Expressions added.
-     * So in the e.g. above it will have a column for 'lead(rank())' in addition to
-     * all columns in inputOI.
-     */
-    StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
-    int numCols = outputOI.getAllStructFieldRefs().size();
-    ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
-    int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
-    Object[] output = new Object[numCols];
-
-    /*
-     * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
-     * we can just forward the row in the oPart partition.
-     */
-    boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
-    PTFPartitionIterator<Object> pItr = oPart.iterator();
-    PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
-    while (pItr.hasNext())
-    {
-      int colCnt = 0;
-      Object oRow = pItr.next();
-
-      /*
-       * when there is no Windowing expressions or having;
-       * just forward the Object coming out of the Partition.
-       */
-      if ( forwardRowsUntouched ) {
-        forward(oRow, outputObjInspector);
-        continue;
-      }
-
-      /*
-       * Setup the output row columns in the following order
-       * - the columns in the SelectList processed by the PTF
-       * (ie the Select Exprs that have navigation expressions)
-       * - the columns from the final PTF.
-       */
-
-      if ( wdwExprs != null ) {
-        for (WindowExpressionDef wdwExpr : wdwExprs)
-        {
-          Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
-          output[colCnt++] = newCol;
-        }
-      }
-
-      for(; colCnt < numCols; ) {
-        StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
-        output[colCnt++] =
-            ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
-            field.getFieldObjectInspector());
-      }
-
-      forward(output, outputObjInspector);
-    }
-  }
 
   /**
    * Create a new Partition.
@@ -390,8 +258,7 @@ public class PTFOperator extends Operato
    * @throws HiveException
    */
   public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
-      HiveConf hiveConf, boolean isMapSide) throws HiveException
-  {
+      HiveConf hiveConf, boolean isMapSide) throws HiveException {
     PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
     TableFunctionEvaluator tEval = tabDef.getTFunction();
 
@@ -410,14 +277,12 @@ public class PTFOperator extends Operato
   }
 
   public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
-      PTFPartitionIterator<Object> pItr) throws HiveException
-  {
+      PTFPartitionIterator<Object> pItr) throws HiveException {
     List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
     if (llFnDescs == null) {
       return;
     }
-    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
-    {
+    for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
       GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
           .getGenericUDF();
       llFn.setpItr(pItr);

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Aug 30 19:46:15 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -44,13 +43,12 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
 
 /**
  * Reduce Sink Operator sends output to the reduce stage.
  **/
 public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
-    implements Serializable {
+    implements Serializable, TopNHash.BinaryCollector {
 
   private static final long serialVersionUID = 1L;
 
@@ -90,6 +88,9 @@ public class ReduceSinkOperator extends 
     return inputAlias;
   }
 
+  // picks topN K:V pairs from input. can be null
+  private transient TopNHash reducerHash;
+
   @Override
   protected void initializeOp(Configuration hconf) throws HiveException {
 
@@ -131,6 +132,8 @@ public class ReduceSinkOperator extends 
           .newInstance();
       valueSerializer.initialize(null, valueTableDesc.getProperties());
 
+      reducerHash = createTopKHash();
+
       firstRow = true;
       initializeChildren(hconf);
     } catch (Exception e) {
@@ -139,14 +142,44 @@ public class ReduceSinkOperator extends 
     }
   }
 
+  private TopNHash createTopKHash() {
+    int limit = conf.getTopN();
+    float percent = conf.getTopNMemoryUsage();
+    if (limit < 0 || percent <= 0) {
+      return null;
+    }
+    if (limit == 0) {
+      return TopNHash.create0();
+    }
+    // limit * 64 : compensation of arrays for key/value/hashcodes
+    long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+    if (threshold < 0) {
+      return null;
+    }
+    return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+  }
+
   transient InspectableObject tempInspectableObject = new InspectableObject();
   transient HiveKey keyWritable = new HiveKey();
-  transient Writable value;
 
   transient StructObjectInspector keyObjectInspector;
   transient StructObjectInspector valueObjectInspector;
   transient ObjectInspector[] partitionObjectInspectors;
 
+  /**
+   * This two dimensional array holds key data and a corresponding Union object
+   * which contains the tag identifying the aggregate expression for distinct columns.
+   *
+   * If there is no distict expression, cachedKeys is simply like this.
+   * cachedKeys[0] = [col0][col1]
+   *
+   * with two distict expression, union(tag:key) is attatched for each distinct expression
+   * cachedKeys[0] = [col0][col1][0:dist1]
+   * cachedKeys[1] = [col0][col1][1:dist2]
+   *
+   * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+   * see {@link ExprNodeColumnEvaluator}
+   */
   transient Object[][] cachedKeys;
   transient Object[] cachedValues;
   transient List<List<Integer>> distinctColIndices;
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends 
   }
 
   @Override
+  @SuppressWarnings("unchecked")
   public void processOp(Object row, int tag) throws HiveException {
     try {
       ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends 
       for (int i = 0; i < valueEval.length; i++) {
         cachedValues[i] = valueEval[i].evaluate(row);
       }
-      // Serialize the value
-      value = valueSerializer.serialize(cachedValues, valueObjectInspector);
 
       // Evaluate the keys
       Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends 
         // no distinct key
         System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
       }
+
+      BytesWritable value = null;
       // Serialize the keys and append the tag
       for (int i = 0; i < cachedKeys.length; i++) {
         if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends 
           }
         }
         keyWritable.setHashCode(keyHashCode);
-        if (out != null) {
-          out.collect(keyWritable, value);
-          // Since this is a terminal operator, update counters explicitly -
-          // forward is not called
-          if (counterNameToEnum != null) {
-            ++outputRows;
-            if (outputRows % 1000 == 0) {
-              incrCounter(numOutputRowsCntr, outputRows);
-              outputRows = 0;
+
+        if (reducerHash == null) {
+          if (null != out) {
+            collect(keyWritable, value = getValue(row, value));
+          }
+       } else {
+          int index = reducerHash.indexOf(keyWritable);
+          if (index == TopNHash.EXCLUDED) {
+            continue;
+          }
+          value = getValue(row, value);
+          if (index >= 0) {
+            reducerHash.set(index, value);
+          } else {
+            if (index == TopNHash.FORWARD) {
+              collect(keyWritable, value);
+            } else if (index == TopNHash.FLUSH) {
+              LOG.info("Top-N hash is flushed");
+              reducerHash.flush();
+              // we can now retry adding key/value into hash, which is flushed.
+              // but for simplicity, just forward them
+              collect(keyWritable, value);
+            } else if (index == TopNHash.DISABLE) {
+              LOG.info("Top-N hash is disabled");
+              reducerHash.flush();
+              collect(keyWritable, value);
+              reducerHash = null;
             }
           }
         }
       }
-    } catch (SerDeException e) {
-      throw new HiveException(e);
-    } catch (IOException e) {
+    } catch (HiveException e) {
+      throw e;
+    } catch (Exception e) {
       throw new HiveException(e);
     }
   }
 
+  public void collect(BytesWritable key, BytesWritable value) throws IOException {
+    // Since this is a terminal operator, update counters explicitly -
+    // forward is not called
+    out.collect(key, value);
+    if (++outputRows % 1000 == 0) {
+      if (counterNameToEnum != null) {
+        incrCounter(numOutputRowsCntr, outputRows);
+      }
+      increaseForward(outputRows);
+      outputRows = 0;
+    }
+  }
+
+  // evaluate value lazily
+  private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+    if (value != null) {
+      return value;
+    }
+    // Evaluate the value
+    for (int i = 0; i < valueEval.length; i++) {
+      cachedValues[i] = valueEval[i].evaluate(row);
+    }
+    // Serialize the value
+    return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+  }
+
+  @Override
+  protected void closeOp(boolean abort) throws HiveException {
+    if (!abort && reducerHash != null) {
+      try {
+        reducerHash.flush();
+      } catch (IOException e) {
+        throw new HiveException(e);
+      } finally {
+        reducerHash = null;
+      }
+    }
+    reducerHash = null;
+    super.closeOp(abort);
+  }
+
   /**
    * @return the name of the operator
    */

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Fri Aug 30 19:46:15 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
   public boolean supportUnionRemoveOptimization() {
     return true;
   }
+
+  @Override
+  public boolean acceptLimitPushdown() {
+    return true;
+  }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Fri Aug 30 19:46:15 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
     hashCodeValid = false;
   }
 
+  public HiveKey(byte[] bytes, int hashcode) {
+    super(bytes);
+    myHashCode = hashcode;
+    hashCodeValid = true;
+  }
+
   protected int myHashCode;
 
   public void setHashCode(int myHashCode) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Aug 30 19:46:15 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 
 import java.io.IOException;
@@ -47,19 +48,67 @@ public final class OrcFile {
    * prevent the new reader from reading ORC files generated by any released
    * version of Hive.
    */
-  public static final int MAJOR_VERSION = 0;
-  public static final int MINOR_VERSION = 11;
+  public static enum Version {
+    V_0_11("0.11", 0, 11),
+      V_0_12("0.12", 0, 12);
+
+    public static final Version CURRENT = V_0_12;
+
+    private final String name;
+    private final int major;
+    private final int minor;
+
+    private Version(String name, int major, int minor) {
+      this.name = name;
+      this.major = major;
+      this.minor = minor;
+    }
+
+    public static Version byName(String name) {
+      for(Version version: values()) {
+        if (version.name.equals(name)) {
+          return version;
+        }
+      }
+      throw new IllegalArgumentException("Unknown ORC version " + name);
+    }
+
+    /**
+     * Get the human readable name for the version.
+     */
+    public String getName() {
+      return name;
+    }
+
+    /**
+     * Get the major version number.
+     */
+    public int getMajor() {
+      return major;
+    }
+
+    /**
+     * Get the minor version number.
+     */
+    public int getMinor() {
+      return minor;
+    }
+  }
 
   // the table properties that control ORC files
   public static final String COMPRESSION = "orc.compress";
-  static final String DEFAULT_COMPRESSION = "ZLIB";
   public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
-  static final String DEFAULT_COMPRESSION_BLOCK_SIZE = "262144";
   public static final String STRIPE_SIZE = "orc.stripe.size";
-  static final String DEFAULT_STRIPE_SIZE = "268435456";
   public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
-  static final String DEFAULT_ROW_INDEX_STRIDE = "10000";
   public static final String ENABLE_INDEXES = "orc.create.index";
+  public static final String BLOCK_PADDING = "orc.block.padding";
+
+  static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+  static final CompressionKind DEFAULT_COMPRESSION_KIND =
+    CompressionKind.ZLIB;
+  static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
+  static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
+  static final boolean DEFAULT_BLOCK_PADDING = true;
 
   // unused
   private OrcFile() {}
@@ -77,7 +126,145 @@ public final class OrcFile {
   }
 
   /**
-   * Create an ORC file streamFactory.
+   * Options for creating ORC file writers.
+   */
+  public static class WriterOptions {
+    private final Configuration configuration;
+    private FileSystem fileSystemValue = null;
+    private ObjectInspector inspectorValue = null;
+    private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+    private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
+    private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
+    private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
+    private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+    private MemoryManager memoryManagerValue;
+    private Version versionValue;
+
+    WriterOptions(Configuration conf) {
+      configuration = conf;
+      memoryManagerValue = getMemoryManager(conf);
+      String versionName =
+        conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+      if (versionName == null) {
+        versionValue = Version.CURRENT;
+      } else {
+        versionValue = Version.byName(versionName);
+      }
+    }
+
+    /**
+     * Provide the filesystem for the path, if the client has it available.
+     * If it is not provided, it will be found from the path.
+     */
+    public WriterOptions fileSystem(FileSystem value) {
+      fileSystemValue = value;
+      return this;
+    }
+
+    /**
+     * Set the stripe size for the file. The writer stores the contents of the
+     * stripe in memory until this memory limit is reached and the stripe
+     * is flushed to the HDFS file and the next stripe started.
+     */
+    public WriterOptions stripeSize(long value) {
+      stripeSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Set the distance between entries in the row index. The minimum value is
+     * 1000 to prevent the index from overwhelming the data. If the stride is
+     * set to 0, no indexes will be included in the file.
+     */
+    public WriterOptions rowIndexStride(int value) {
+      rowIndexStrideValue = value;
+      return this;
+    }
+
+    /**
+     * The size of the memory buffers used for compressing and storing the
+     * stripe in memory.
+     */
+    public WriterOptions bufferSize(int value) {
+      bufferSizeValue = value;
+      return this;
+    }
+
+    /**
+     * Sets whether the HDFS blocks are padded to prevent stripes from
+     * straddling blocks. Padding improves locality and thus the speed of
+     * reading, but costs space.
+     */
+    public WriterOptions blockPadding(boolean value) {
+      blockPaddingValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the generic compression that is used to compress the data.
+     */
+    public WriterOptions compress(CompressionKind value) {
+      compressValue = value;
+      return this;
+    }
+
+    /**
+     * A required option that sets the object inspector for the rows. Used
+     * to determine the schema for the file.
+     */
+    public WriterOptions inspector(ObjectInspector value) {
+      inspectorValue = value;
+      return this;
+    }
+
+    /**
+     * Sets the version of the file that will be written.
+     */
+    public WriterOptions version(Version value) {
+      versionValue = value;
+      return this;
+    }
+
+    /**
+     * A package local option to set the memory manager.
+     */
+    WriterOptions memory(MemoryManager value) {
+      memoryManagerValue = value;
+      return this;
+    }
+  }
+
+  /**
+   * Create a default set of write options that can be modified.
+   */
+  public static WriterOptions writerOptions(Configuration conf) {
+    return new WriterOptions(conf);
+  }
+
+  /**
+   * Create an ORC file writer. This is the public interface for creating
+   * writers going forward and new options will only be added to this method.
+   * @param path filename to write to
+   * @param options the options
+   * @return a new ORC file writer
+   * @throws IOException
+   */
+  public static Writer createWriter(Path path,
+                                    WriterOptions opts
+                                    ) throws IOException {
+    FileSystem fs = opts.fileSystemValue == null ?
+      path.getFileSystem(opts.configuration) : opts.fileSystemValue;
+
+    return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+                          opts.stripeSizeValue, opts.compressValue,
+                          opts.bufferSizeValue, opts.rowIndexStrideValue,
+                          opts.memoryManagerValue, opts.blockPaddingValue,
+                          opts.versionValue);
+  }
+
+  /**
+   * Create an ORC file writer. This method is provided for API backward
+   * compatability with Hive 0.11.
    * @param fs file system
    * @param path filename to write to
    * @param inspector the ObjectInspector that inspects the rows
@@ -86,7 +273,7 @@ public final class OrcFile {
    * @param bufferSize the number of bytes to compress at once
    * @param rowIndexStride the number of rows between row index entries or
    *                       0 to suppress all indexes
-   * @return a new ORC file streamFactory
+   * @return a new ORC file writer
    * @throws IOException
    */
   public static Writer createWriter(FileSystem fs,
@@ -97,8 +284,14 @@ public final class OrcFile {
                                     CompressionKind compress,
                                     int bufferSize,
                                     int rowIndexStride) throws IOException {
-    return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
-      bufferSize, rowIndexStride, getMemoryManager(conf));
+    return createWriter(path,
+                        writerOptions(conf)
+                        .fileSystem(fs)
+                        .inspector(inspector)
+                        .stripeSize(stripeSize)
+                        .compress(compress)
+                        .bufferSize(bufferSize)
+                        .rowIndexStride(rowIndexStride));
   }
 
   private static MemoryManager memoryManager = null;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Fri Aug 30 19:46:15 2013
@@ -47,32 +47,20 @@ public class OrcOutputFormat extends Fil
       implements RecordWriter<NullWritable, OrcSerdeRow>,
                  FileSinkOperator.RecordWriter {
     private Writer writer = null;
-    private final FileSystem fs;
     private final Path path;
-    private final Configuration conf;
-    private final long stripeSize;
-    private final int compressionSize;
-    private final CompressionKind compress;
-    private final int rowIndexStride;
-
-    OrcRecordWriter(FileSystem fs, Path path, Configuration conf,
-                    String stripeSize, String compress,
-                    String compressionSize, String rowIndexStride) {
-      this.fs = fs;
+    private final OrcFile.WriterOptions options;
+
+    OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
       this.path = path;
-      this.conf = conf;
-      this.stripeSize = Long.valueOf(stripeSize);
-      this.compress = CompressionKind.valueOf(compress);
-      this.compressionSize = Integer.valueOf(compressionSize);
-      this.rowIndexStride = Integer.valueOf(rowIndexStride);
+      this.options = options;
     }
 
     @Override
     public void write(NullWritable nullWritable,
                       OrcSerdeRow row) throws IOException {
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
-            stripeSize, compress, compressionSize, rowIndexStride);
+        options.inspector(row.getInspector());
+        writer = OrcFile.createWriter(path, options);
       }
       writer.addRow(row.getRow());
     }
@@ -81,9 +69,8 @@ public class OrcOutputFormat extends Fil
     public void write(Writable row) throws IOException {
       OrcSerdeRow serdeRow = (OrcSerdeRow) row;
       if (writer == null) {
-        writer = OrcFile.createWriter(fs, path, this.conf,
-            serdeRow.getInspector(), stripeSize, compress, compressionSize,
-            rowIndexStride);
+        options.inspector(serdeRow.getInspector());
+        writer = OrcFile.createWriter(path, options);
       }
       writer.addRow(serdeRow.getRow());
     }
@@ -102,8 +89,8 @@ public class OrcOutputFormat extends Fil
         ObjectInspector inspector = ObjectInspectorFactory.
             getStandardStructObjectInspector(new ArrayList<String>(),
                 new ArrayList<ObjectInspector>());
-        writer = OrcFile.createWriter(fs, path, this.conf, inspector,
-            stripeSize, compress, compressionSize, rowIndexStride);
+        options.inspector(inspector);
+        writer = OrcFile.createWriter(path, options);
       }
       writer.close();
     }
@@ -113,9 +100,8 @@ public class OrcOutputFormat extends Fil
   public RecordWriter<NullWritable, OrcSerdeRow>
       getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
                       Progressable reporter) throws IOException {
-    return new OrcRecordWriter(fileSystem,  new Path(name), conf,
-      OrcFile.DEFAULT_STRIPE_SIZE, OrcFile.DEFAULT_COMPRESSION,
-      OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE, OrcFile.DEFAULT_ROW_INDEX_STRIDE);
+    return new
+      OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
   }
 
   @Override
@@ -126,20 +112,42 @@ public class OrcOutputFormat extends Fil
                          boolean isCompressed,
                          Properties tableProperties,
                          Progressable reporter) throws IOException {
-    String stripeSize = tableProperties.getProperty(OrcFile.STRIPE_SIZE,
-        OrcFile.DEFAULT_STRIPE_SIZE);
-    String compression = tableProperties.getProperty(OrcFile.COMPRESSION,
-        OrcFile.DEFAULT_COMPRESSION);
-    String compressionSize =
-      tableProperties.getProperty(OrcFile.COMPRESSION_BLOCK_SIZE,
-        OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE);
-    String rowIndexStride =
-        tableProperties.getProperty(OrcFile.ROW_INDEX_STRIDE,
-            OrcFile.DEFAULT_ROW_INDEX_STRIDE);
-    if ("false".equals(tableProperties.getProperty(OrcFile.ENABLE_INDEXES))) {
-      rowIndexStride = "0";
+    OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+    if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
+      options.stripeSize(Long.parseLong
+                           (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
+      options.compress(CompressionKind.valueOf
+                           (tableProperties.getProperty(OrcFile.COMPRESSION)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
+      options.bufferSize(Integer.parseInt
+                         (tableProperties.getProperty
+                            (OrcFile.COMPRESSION_BLOCK_SIZE)));
+    }
+
+    if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
+      options.rowIndexStride(Integer.parseInt
+                             (tableProperties.getProperty
+                              (OrcFile.ROW_INDEX_STRIDE)));
     }
-    return new OrcRecordWriter(path.getFileSystem(conf), path, conf,
-      stripeSize, compression, compressionSize, rowIndexStride);
+
+    if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
+      if ("false".equals(tableProperties.getProperty
+                         (OrcFile.ENABLE_INDEXES))) {
+        options.rowIndexStride(0);
+      }
+    }
+
+    if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
+      options.blockPadding(Boolean.parseBoolean
+                           (tableProperties.getProperty
+                            (OrcFile.BLOCK_PADDING)));
+    }
+
+    return new OrcRecordWriter(path, options);
   }
 }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Fri Aug 30 19:46:15 2013
@@ -248,11 +248,13 @@ final class ReaderImpl implements Reader
       if (version.size() >= 2) {
         minor = version.get(1);
       }
-      if (major > OrcFile.MAJOR_VERSION ||
-          (major == OrcFile.MAJOR_VERSION && minor > OrcFile.MINOR_VERSION)) {
-        log.warn("ORC file " + path + " was written by a future Hive version " +
-            versionString(version) + ". This file may not be readable by " +
-            "this version of Hive.");
+      if (major > OrcFile.Version.CURRENT.getMajor() ||
+          (major == OrcFile.Version.CURRENT.getMajor() &&
+           minor > OrcFile.Version.CURRENT.getMinor())) {
+        log.warn("ORC file " + path +
+                 " was written by a future Hive version " +
+                 versionString(version) +
+                 ". This file may not be readable by this version of Hive.");
       }
     }
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Fri Aug 30 19:46:15 2013
@@ -82,13 +82,18 @@ class WriterImpl implements Writer, Memo
   private static final int HDFS_BUFFER_SIZE = 256 * 1024;
   private static final int MIN_ROW_INDEX_STRIDE = 1000;
 
+  // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB
+  private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024;
+
   private final FileSystem fs;
   private final Path path;
   private final long stripeSize;
   private final int rowIndexStride;
   private final CompressionKind compress;
   private final CompressionCodec codec;
+  private final boolean addBlockPadding;
   private final int bufferSize;
+  private final long blockSize;
   // the streams that make up the current stripe
   private final Map<StreamName, BufferedStream> streams =
     new TreeMap<StreamName, BufferedStream>();
@@ -113,6 +118,7 @@ class WriterImpl implements Writer, Memo
       OrcProto.RowIndex.newBuilder();
   private final boolean buildIndex;
   private final MemoryManager memoryManager;
+  private final OrcFile.Version version;
 
   private final Configuration conf;
 
@@ -124,11 +130,17 @@ class WriterImpl implements Writer, Memo
              CompressionKind compress,
              int bufferSize,
              int rowIndexStride,
-             MemoryManager memoryManager) throws IOException {
+             MemoryManager memoryManager,
+             boolean addBlockPadding,
+             OrcFile.Version version) throws IOException {
     this.fs = fs;
     this.path = path;
     this.conf = conf;
     this.stripeSize = stripeSize;
+    this.version = version;
+    this.addBlockPadding = addBlockPadding;
+    // pick large block size to minimize block over or under hangs
+    this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
     this.compress = compress;
     this.bufferSize = bufferSize;
     this.rowIndexStride = rowIndexStride;
@@ -249,6 +261,19 @@ class WriterImpl implements Writer, Memo
     }
 
     /**
+     * Get the number of bytes that will be written to the output. Assumes
+     * the stream has already been flushed.
+     * @return the number of bytes
+     */
+    public long getOutputSize() {
+      long result = 0;
+      for(ByteBuffer buffer: output) {
+        result += buffer.remaining();
+      }
+      return result;
+    }
+
+    /**
      * Write the saved compressed buffers to the OutputStream.
      * @param out the stream to write to
      * @throws IOException
@@ -359,6 +384,13 @@ class WriterImpl implements Writer, Memo
     public Configuration getConfiguration() {
       return conf;
     }
+
+    /**
+     * Get the version of the file to write.
+     */
+    public OrcFile.Version getVersion() {
+      return version;
+    }
   }
 
   /**
@@ -442,20 +474,7 @@ class WriterImpl implements Writer, Memo
     }
 
     boolean isNewWriteFormat(StreamFactory writer) {
-      String writeFormat = writer.getConfiguration().get(
-          HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
-      if (writeFormat == null) {
-        LOG.warn("ORC write format not defined. Using 0.12 ORC write format.");
-        return true;
-      }
-      if (writeFormat
-          .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) {
-        LOG.info("Using 0.11 ORC write format.");
-        return false;
-      }
-
-      LOG.info("Using 0.12 ORC write format.");
-      return true;
+      return writer.getVersion() != OrcFile.Version.V_0_11;
     }
 
     /**
@@ -874,9 +893,10 @@ class WriterImpl implements Writer, Memo
       // Set the flag indicating whether or not to use dictionary encoding
       // based on whether or not the fraction of distinct keys over number of
       // non-null rows is less than the configured threshold
-      useDictionaryEncoding = rows.size() > 0 &&
-        (float)(dictionary.size()) / rows.size() <=
-          dictionaryKeySizeThreshold;
+      useDictionaryEncoding =
+        (!isDirectV2) || (rows.size() > 0 &&
+                          (float)(dictionary.size()) / rows.size() <=
+                            dictionaryKeySizeThreshold);
       final int[] dumpOrder = new int[dictionary.size()];
 
       if (useDictionaryEncoding) {
@@ -1600,12 +1620,11 @@ class WriterImpl implements Writer, Memo
   private void ensureWriter() throws IOException {
     if (rawWriter == null) {
       rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
-        fs.getDefaultReplication(),
-          Math.min(stripeSize * 2L, Integer.MAX_VALUE));
+                            fs.getDefaultReplication(), blockSize);
       rawWriter.writeBytes(OrcFile.MAGIC);
       headerLength = rawWriter.getPos();
       writer = new OutStream("metadata", bufferSize, codec,
-        new DirectStream(rawWriter));
+                             new DirectStream(rawWriter));
       protobufWriter = CodedOutputStream.newInstance(writer);
     }
   }
@@ -1621,43 +1640,70 @@ class WriterImpl implements Writer, Memo
       createRowIndexEntry();
     }
     if (rowsInStripe != 0) {
+
+      // finalize the data for the stripe
       int requiredIndexEntries = rowIndexStride == 0 ? 0 :
           (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
       OrcProto.StripeFooter.Builder builder =
           OrcProto.StripeFooter.newBuilder();
       treeWriter.writeStripe(builder, requiredIndexEntries);
-      long start = rawWriter.getPos();
-      long section = start;
-      long indexEnd = start;
+      long indexSize = 0;
+      long dataSize = 0;
       for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
         BufferedStream stream = pair.getValue();
         if (!stream.isSuppressed()) {
           stream.flush();
-          stream.spillTo(rawWriter);
-          long end = rawWriter.getPos();
           StreamName name = pair.getKey();
+          long streamSize = pair.getValue().getOutputSize();
           builder.addStreams(OrcProto.Stream.newBuilder()
-              .setColumn(name.getColumn())
-              .setKind(name.getKind())
-              .setLength(end-section));
-          section = end;
+                             .setColumn(name.getColumn())
+                             .setKind(name.getKind())
+                             .setLength(streamSize));
           if (StreamName.Area.INDEX == name.getArea()) {
-            indexEnd = end;
+            indexSize += streamSize;
+          } else {
+            dataSize += streamSize;
           }
         }
+      }
+      OrcProto.StripeFooter footer = builder.build();
+
+      // Do we need to pad the file so the stripe doesn't straddle a block
+      // boundary?
+      long start = rawWriter.getPos();
+      long stripeSize = indexSize + dataSize + footer.getSerializedSize();
+      if (addBlockPadding &&
+          stripeSize < blockSize &&
+          (start % blockSize) + stripeSize > blockSize) {
+        long padding = blockSize - (start % blockSize);
+        byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+        start += padding;
+        while (padding > 0) {
+          int writeLen = (int) Math.min(padding, pad.length);
+          rawWriter.write(pad, 0, writeLen);
+          padding -= writeLen;
+        }
+      }
+
+      // write out the data streams
+      for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+        BufferedStream stream = pair.getValue();
+        if (!stream.isSuppressed()) {
+          stream.spillTo(rawWriter);
+        }
         stream.clear();
       }
-      builder.build().writeTo(protobufWriter);
+      footer.writeTo(protobufWriter);
       protobufWriter.flush();
       writer.flush();
-      long end = rawWriter.getPos();
+      long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
       OrcProto.StripeInformation dirEntry =
           OrcProto.StripeInformation.newBuilder()
               .setOffset(start)
-              .setIndexLength(indexEnd - start)
-              .setDataLength(section - indexEnd)
               .setNumberOfRows(rowsInStripe)
-              .setFooterLength(end - section).build();
+              .setIndexLength(indexSize)
+              .setDataLength(dataSize)
+              .setFooterLength(footerLength).build();
       stripes.add(dirEntry);
       rowCount += rowsInStripe;
       rowsInStripe = 0;
@@ -1704,7 +1750,8 @@ class WriterImpl implements Writer, Memo
         .setName(entry.getKey()).setValue(entry.getValue()));
     }
     long startPosn = rawWriter.getPos();
-    builder.build().writeTo(protobufWriter);
+    OrcProto.Footer footer = builder.build();
+    footer.writeTo(protobufWriter);
     protobufWriter.flush();
     writer.flush();
     return (int) (rawWriter.getPos() - startPosn);
@@ -1716,8 +1763,8 @@ class WriterImpl implements Writer, Memo
         .setCompression(writeCompressionKind(compress))
         .setFooterLength(footerLength)
         .setMagic(OrcFile.MAGIC)
-        .addVersion(OrcFile.MAJOR_VERSION)
-        .addVersion(OrcFile.MINOR_VERSION);
+        .addVersion(version.getMajor())
+        .addVersion(version.getMinor());
     if (compress != CompressionKind.NONE) {
       builder.setCompressionBlockSize(bufferSize);
     }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Fri Aug 30 19:46:15 2013
@@ -68,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -223,12 +222,6 @@ public final class ColumnPrunerProcFacto
           }
         }
       }
-      if ( tDef.getWindowExpressions() != null ) {
-        for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
-          ExprNodeDesc exprNode = expr.getExprNode();
-          Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
-        }
-      }
      if(tDef.getPartition() != null){
          for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
            ExprNodeDesc exprNode = col.getExprNode();

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Aug 30 19:46:15 2013
@@ -125,7 +125,7 @@ public class GenMRFileSink1 implements N
         // no need of merging if the move is to a local file system
         MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp);
 
-        if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+        if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
           addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf());
         }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Aug 30 19:46:15 2013
@@ -110,6 +110,9 @@ public class Optimizer {
         !HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
       transformations.add(new CorrelationOptimizer());
     }
+    if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
+      transformations.add(new LimitPushdownOptimizer());
+    }
     transformations.add(new SimpleFetchOptimizer());  // must be called last
   }
 

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Fri Aug 30 19:46:15 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hive.ql.plan.PT
 import org.apache.hadoop.hive.ql.plan.PTFDesc.RangeBoundaryDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
 import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -200,83 +199,26 @@ public class PTFTranslator {
     /*
      * set outputFromWdwFnProcessing
      */
-    if (windowFunctions.size() > 0) {
-      ArrayList<String> aliases = new ArrayList<String>();
-      ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-      for (WindowFunctionDef wFnDef : windowFunctions) {
-        aliases.add(wFnDef.getAlias());
-        if (wFnDef.isPivotResult()) {
-          fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
-        } else {
-          fieldOIs.add(wFnDef.getOI());
-        }
-      }
-      PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
-      StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
-          aliases, fieldOIs);
-      tFn.setWdwProcessingOutputOI(wdwOutOI);
-      RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef, false);
-      ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
-      wdwTFnDef.setOutputFromWdwFnProcessing(wdwOutShape);
-    }
-    else {
-      wdwTFnDef.setOutputFromWdwFnProcessing(inpShape);
-    }
-
-    /*
-     * process Wdw expressions
-     */
-    ShapeDetails wdwOutShape = wdwTFnDef.getOutputFromWdwFnProcessing();
-    ArrayList<WindowExpressionDef> windowExpressions = new ArrayList<WindowExpressionDef>();
-    if (wdwSpec.getWindowExpressions() != null) {
-      for (WindowExpressionSpec expr : wdwSpec.getWindowExpressions()) {
-        if (!(expr instanceof WindowFunctionSpec)) {
-          try {
-            PTFExpressionDef eDef = buildExpressionDef(wdwOutShape, expr.getExpression());
-            WindowExpressionDef wdwEDef = new WindowExpressionDef(eDef);
-            wdwEDef.setAlias(expr.getAlias());
-            windowExpressions.add(wdwEDef);
-          } catch (HiveException he) {
-            throw new SemanticException(he);
-          }
-        }
+    ArrayList<String> aliases = new ArrayList<String>();
+    ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+    for (WindowFunctionDef wFnDef : windowFunctions) {
+      aliases.add(wFnDef.getAlias());
+      if (wFnDef.isPivotResult()) {
+        fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+      } else {
+        fieldOIs.add(wFnDef.getOI());
       }
-      wdwTFnDef.setWindowExpressions(windowExpressions);
-    }
-
-    /*
-     * set outputOI
-     */
-    if (windowExpressions.size() > 0) {
-      ArrayList<String> aliases = new ArrayList<String>();
-      ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
-      for (WindowExpressionDef wEDef : windowExpressions) {
-        aliases.add(wEDef.getAlias());
-        fieldOIs.add(wEDef.getOI());
-      }
-      PTFTranslator.addInputColumnsToList(wdwOutShape, aliases, fieldOIs);
-      StructObjectInspector outOI = ObjectInspectorFactory.getStandardStructObjectInspector(
-          aliases, fieldOIs);
-      RowResolver outRR = buildRowResolverForWindowing(wdwTFnDef, true);
-      ShapeDetails outShape = setupShape(outOI, null, outRR);
-      wdwTFnDef.setOutputShape(outShape);
-    }
-    else {
-      wdwTFnDef.setOutputShape(copyShape(wdwOutShape));
     }
+    PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
+    StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+        aliases, fieldOIs);
+    tFn.setWdwProcessingOutputOI(wdwOutOI);
+    RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef);
+    ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
+    wdwTFnDef.setOutputShape(wdwOutShape);
 
     tFn.setupOutputOI();
 
-    /*
-     * If we have windowExpressions then we convert to Std. Object to process;
-     * we just stream these rows; no need to put in an output Partition.
-     */
-    if (windowExpressions.size() > 0) {
-      StructObjectInspector oi = (StructObjectInspector)
-          ObjectInspectorUtils.getStandardObjectInspector(wdwTFnDef.getOutputShape().getOI());
-      wdwTFnDef.getOutputShape().setOI(oi);
-    }
-
     return ptfDesc;
   }
 
@@ -949,23 +891,10 @@ public class PTFTranslator {
     return rwsch;
   }
 
-  protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def,
-      boolean addWdwExprs) throws SemanticException {
+  protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def)
+      throws SemanticException {
     RowResolver rr = new RowResolver();
     HashMap<String, WindowExpressionSpec> aliasToExprMap = windowingSpec.getAliasToWdwExpr();
-    /*
-     * add Window Expressions
-     */
-    if (addWdwExprs) {
-      for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
-        ASTNode ast = aliasToExprMap.get(wEDef.getAlias()).getExpression();
-        ColumnInfo cInfo = new ColumnInfo(wEDef.getAlias(),
-            TypeInfoUtils.getTypeInfoFromObjectInspector(wEDef.getOI()),
-            null,
-            false);
-        rr.putExpression(ast, cInfo);
-      }
-    }
 
     /*
      * add Window Functions

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Fri Aug 30 19:46:15 2013
@@ -67,19 +67,6 @@ public class WindowingSpec {
     windowSpecs.put(name, wdwSpec);
   }
 
-  public void addExpression(ASTNode expr, String alias) {
-    windowExpressions = windowExpressions == null ?
-        new ArrayList<WindowExpressionSpec>() : windowExpressions;
-    aliasToWdwExpr = aliasToWdwExpr == null ?
-        new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
-    WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
-    wExprSpec.setAlias(alias);
-    wExprSpec.setExpression(expr);
-
-    windowExpressions.add(wExprSpec);
-    aliasToWdwExpr.put(alias, wExprSpec);
-  }
-
   public void addWindowFunction(WindowFunctionSpec wFn) {
     windowExpressions = windowExpressions == null ?
         new ArrayList<WindowExpressionSpec>() : windowExpressions;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java Fri Aug 30 19:46:15 2013
@@ -239,28 +239,8 @@ public class PTFDesc extends AbstractOpe
   }
 
   public static class WindowTableFunctionDef extends PartitionedTableFunctionDef {
-    ArrayList<WindowExpressionDef> windowExpressions;
     ArrayList<WindowFunctionDef> windowFunctions;
-    /*
-     * this shape omits the non WdwFunction Expressions. Expr Evaluators for the Window Expressions is based on this
-     * shape, so they can refer to the Wdw Function values.
-     * @note: this will eventually be removed, as plan is to push Wdw expression processing to separate Select Op after
-     * PTF Op.
-     */
-    ShapeDetails outputFromWdwFnProcessing;
-
-    public ArrayList<WindowExpressionDef> getWindowExpressions() {
-      return windowExpressions;
-    }
-    public void setWindowExpressions(ArrayList<WindowExpressionDef> windowExpressions) {
-      this.windowExpressions = windowExpressions;
-    }
-    public ShapeDetails getOutputFromWdwFnProcessing() {
-      return outputFromWdwFnProcessing;
-    }
-    public void setOutputFromWdwFnProcessing(ShapeDetails outputFromWdwFnProcessing) {
-      this.outputFromWdwFnProcessing = outputFromWdwFnProcessing;
-    }
+
     public ArrayList<WindowFunctionDef> getWindowFunctions() {
       return windowFunctions;
     }