You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/08/30 21:46:17 UTC
svn commit: r1519056 [2/3] - in /hive/branches/tez: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
hbase-handler/src/test/queries/positive/
hbase-handler/src/test/results/positive/ hcatalog/
hcatalog/build-support/ant/ hcatalog/core/ hcatalog/co...
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreDirectSql.java Fri Aug 30 19:46:15 2013
@@ -79,23 +79,39 @@ class MetaStoreDirectSql {
public List<Partition> getPartitionsViaSqlFilter(
String dbName, String tblName, List<String> partNames) throws MetaException {
String list = repeat(",?", partNames.size()).substring(1);
- return getPartitionsViaSqlFilterInternal(dbName, tblName,
- "and PARTITIONS.PART_NAME in (" + list + ")" , partNames, new ArrayList<String>());
+ return getPartitionsViaSqlFilterInternal(dbName, tblName, null,
+ "and PARTITIONS.PART_NAME in (" + list + ")", partNames, new ArrayList<String>());
}
/**
* Gets partitions by using direct SQL queries.
- * @param dbName Metastore db name.
- * @param tblName Metastore table name.
+ * @param table The table.
* @param parser The parsed filter from which the SQL filter will be generated.
* @return List of partitions.
*/
- public List<Partition> getPartitionsViaSqlFilter(Table table, String dbName,
- String tblName, FilterParser parser) throws MetaException {
+ public List<Partition> getPartitionsViaSqlFilter(
+ Table table, FilterParser parser) throws MetaException {
List<String> params = new ArrayList<String>(), joins = new ArrayList<String>();
String sqlFilter = (parser == null) ? null
: PartitionFilterGenerator.generateSqlFilter(table, parser.tree, params, joins);
- return getPartitionsViaSqlFilterInternal(dbName, tblName, sqlFilter, params, joins);
+ return getPartitionsViaSqlFilterInternal(table.getDbName(), table.getTableName(),
+ isViewTable(table), sqlFilter, params, joins);
+ }
+
+ private static Boolean isViewTable(Table t) {
+ return t.isSetTableType() ?
+ t.getTableType().equals(TableType.VIRTUAL_VIEW.toString()) : null;
+ }
+
+ private boolean isViewTable(String dbName, String tblName) throws MetaException {
+ String queryText = "select TBL_TYPE from TBLS" +
+ " inner join DBS on TBLS.DB_ID = DBS.DB_ID " +
+ " where TBLS.TBL_NAME = ? and DBS.NAME = ?";
+ Object[] params = new Object[] { tblName, dbName };
+ Query query = pm.newQuery("javax.jdo.query.SQL", queryText);
+ query.setUnique(true);
+ Object result = query.executeWithArray(params);
+ return (result != null) && result.toString().equals(TableType.VIRTUAL_VIEW.toString());
}
/**
@@ -103,14 +119,16 @@ class MetaStoreDirectSql {
* queries created by DN retrieving stuff for each object individually.
* @param dbName Metastore db name.
* @param tblName Metastore table name.
+ * @param isView Whether table is a view. Can be passed as null if not immediately
+ * known, then this method will get it only if necessary.
* @param sqlFilter SQL filter to use. Better be SQL92-compliant. Can be null.
* @param paramsForFilter params for ?-s in SQL filter text. Params must be in order.
* @param joinsForFilter if the filter needs additional join statement, they must be in
* this list. Better be SQL92-compliant.
* @return List of partition objects. FieldSchema is currently not populated.
*/
- private List<Partition> getPartitionsViaSqlFilterInternal(String dbName,
- String tblName, String sqlFilter, List<String> paramsForFilter,
+ private List<Partition> getPartitionsViaSqlFilterInternal(String dbName, String tblName,
+ Boolean isView, String sqlFilter, List<String> paramsForFilter,
List<String> joinsForFilter) throws MetaException {
boolean doTrace = LOG.isDebugEnabled();
// Get all simple fields for partitions and related objects, which we can map one-on-one.
@@ -191,9 +209,15 @@ class MetaStoreDirectSql {
Long sdId = (Long)fields[1];
Long colId = (Long)fields[2];
Long serdeId = (Long)fields[3];
+ // A partition must have either everything set, or nothing set if it's a view.
if (sdId == null || colId == null || serdeId == null) {
- throw new MetaException("Unexpected null for one of the IDs, SD " + sdId
- + ", column " + colId + ", serde " + serdeId);
+ if (isView == null) {
+ isView = isViewTable(dbName, tblName);
+ }
+ if ((sdId != null || colId != null || serdeId != null) || !isView) {
+ throw new MetaException("Unexpected null for one of the IDs, SD " + sdId + ", column "
+ + colId + ", serde " + serdeId + " for a " + (isView ? "" : "non-") + " view");
+ }
}
Partition part = new Partition();
@@ -207,6 +231,9 @@ class MetaStoreDirectSql {
if (fields[5] != null) part.setLastAccessTime((Integer)fields[5]);
partitions.put(partitionId, part);
+ if (sdId == null) continue; // Probably a view.
+ assert colId != null && serdeId != null;
+
// We assume each partition has an unique SD.
StorageDescriptor sd = new StorageDescriptor();
StorageDescriptor oldSd = sds.put(sdId, sd);
@@ -257,10 +284,6 @@ class MetaStoreDirectSql {
(System.nanoTime() - queryTime) / 1000000.0 + "ms, the query is [ " + queryText + "]");
}
- // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
- String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
- colIds = trimCommaList(colsSb);
-
// Now get all the one-to-many things. Start with partitions.
queryText = "select PART_ID, PARAM_KEY, PARAM_VALUE from PARTITION_PARAMS where PART_ID in ("
+ partIds + ") and PARAM_KEY is not null order by PART_ID asc";
@@ -276,6 +299,14 @@ class MetaStoreDirectSql {
t.addToValues((String)fields[1]);
}});
+ // Prepare IN (blah) lists for the following queries. Cut off the final ','s.
+ if (sdSb.length() == 0) {
+ assert serdeSb.length() == 0 && colsSb.length() == 0;
+ return orderedResult; // No SDs, probably a view.
+ }
+ String sdIds = trimCommaList(sdSb), serdeIds = trimCommaList(serdeSb),
+ colIds = trimCommaList(colsSb);
+
// Get all the stuff for SD. Don't do empty-list check - we expect partitions do have SDs.
queryText = "select SD_ID, PARAM_KEY, PARAM_VALUE from SD_PARAMS where SD_ID in ("
+ sdIds + ") and PARAM_KEY is not null order by SD_ID asc";
@@ -341,7 +372,7 @@ class MetaStoreDirectSql {
if (currentListId == null || fieldsListId != currentListId) {
currentList = new ArrayList<String>();
currentListId = fieldsListId;
- t.getSkewedInfo().addToSkewedColValues(currentList); // TODO#: here
+ t.getSkewedInfo().addToSkewedColValues(currentList);
}
currentList.add((String)fields[2]);
}
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Fri Aug 30 19:46:15 2013
@@ -1659,9 +1659,17 @@ public class ObjectStore implements RawS
@Override
public List<Partition> getPartitionsByNames(String dbName, String tblName,
List<String> partNames) throws MetaException, NoSuchObjectException {
+ return getPartitionsByNamesInternal(dbName, tblName, partNames, true, true);
+ }
+
+ protected List<Partition> getPartitionsByNamesInternal(String dbName, String tblName,
+ List<String> partNames, boolean allowSql, boolean allowJdo)
+ throws MetaException, NoSuchObjectException {
+ assert allowSql || allowJdo;
boolean doTrace = LOG.isDebugEnabled();
List<Partition> results = null;
- boolean doUseDirectSql = HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
+ boolean doUseDirectSql = allowSql
+ && HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
boolean success = false;
try {
@@ -1671,7 +1679,13 @@ public class ObjectStore implements RawS
try {
results = directSql.getPartitionsViaSqlFilter(dbName, tblName, partNames);
} catch (Exception ex) {
- LOG.error("Direct SQL failed, falling back to ORM", ex);
+ LOG.error("Direct SQL failed" + (allowJdo ? ", falling back to ORM" : ""), ex);
+ if (!allowJdo) {
+ if (ex instanceof MetaException) {
+ throw (MetaException)ex;
+ }
+ throw new MetaException(ex.getMessage());
+ }
doUseDirectSql = false;
rollbackTransaction();
start = doTrace ? System.nanoTime() : 0;
@@ -1734,9 +1748,16 @@ public class ObjectStore implements RawS
@Override
public List<Partition> getPartitionsByFilter(String dbName, String tblName,
String filter, short maxParts) throws MetaException, NoSuchObjectException {
+ return getPartitionsByFilterInternal(dbName, tblName, filter, maxParts, true, true);
+ }
+
+ protected List<Partition> getPartitionsByFilterInternal(String dbName, String tblName,
+ String filter, short maxParts, boolean allowSql, boolean allowJdo)
+ throws MetaException, NoSuchObjectException {
+ assert allowSql || allowJdo;
boolean doTrace = LOG.isDebugEnabled();
// There's no portable SQL limit. It doesn't make a lot of sense w/o offset anyway.
- boolean doUseDirectSql = (maxParts < 0)
+ boolean doUseDirectSql = allowSql && (maxParts < 0)
&& HiveConf.getBoolVar(getConf(), ConfVars.METASTORE_TRY_DIRECT_SQL);
dbName = dbName.toLowerCase();
tblName = tblName.toLowerCase();
@@ -1755,9 +1776,15 @@ public class ObjectStore implements RawS
if (doUseDirectSql) {
try {
Table table = convertToTable(mtable);
- results = directSql.getPartitionsViaSqlFilter(table, dbName, tblName, parser);
+ results = directSql.getPartitionsViaSqlFilter(table, parser);
} catch (Exception ex) {
- LOG.error("Direct SQL failed, falling back to ORM", ex);
+ LOG.error("Direct SQL failed" + (allowJdo ? ", falling back to ORM" : ""), ex);
+ if (!allowJdo) {
+ if (ex instanceof MetaException) {
+ throw (MetaException)ex;
+ }
+ throw new MetaException(ex.getMessage());
+ }
doUseDirectSql = false;
rollbackTransaction();
start = doTrace ? System.nanoTime() : 0;
Modified: hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java (original)
+++ hive/branches/tez/metastore/src/java/org/apache/hadoop/hive/metastore/RetryingRawStore.java Fri Aug 30 19:46:15 2013
@@ -23,7 +23,9 @@ import java.lang.reflect.InvocationTarge
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
+import java.util.List;
+import org.apache.commons.lang.ClassUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -70,8 +72,19 @@ public class RetryingRawStore implements
RetryingRawStore handler = new RetryingRawStore(hiveConf, conf, baseClass, id);
- return (RawStore) Proxy.newProxyInstance(RetryingRawStore.class.getClassLoader()
- , baseClass.getInterfaces(), handler);
+ // Look for interfaces on both the class and all base classes.
+ return (RawStore) Proxy.newProxyInstance(RetryingRawStore.class.getClassLoader(),
+ getAllInterfaces(baseClass), handler);
+ }
+
+ private static Class<?>[] getAllInterfaces(Class<?> baseClass) {
+ List interfaces = ClassUtils.getAllInterfaces(baseClass);
+ Class<?>[] result = new Class<?>[interfaces.size()];
+ int i = 0;
+ for (Object o : interfaces) {
+ result[i++] = (Class<?>)o;
+ }
+ return result;
}
private void init() throws MetaException {
Modified: hive/branches/tez/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/build.xml?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/build.xml (original)
+++ hive/branches/tez/ql/build.xml Fri Aug 30 19:46:15 2013
@@ -250,15 +250,24 @@
<exclude name="META-INF/MANIFEST.MF"/>
</patternset>
</unzip>
- <unzip
- src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
+ <unzip
+ src="${build.ivy.lib.dir}/default/protobuf-java-${protobuf.version}.jar"
dest="${build.dir.hive}/protobuf-java/classes">
<patternset>
<exclude name="META-INF"/>
<exclude name="META-INF/MANIFEST.MF"/>
</patternset>
</unzip>
- <unzip
+ <unzip
+ src="${build.ivy.lib.dir}/default/guava-${guava.version}.jar"
+ dest="${build.dir.hive}/guava/classes">
+ <patternset>
+ <exclude name="META-INF"/>
+ <exclude name="META-INF/MANIFEST.MF"/>
+ </patternset>
+ </unzip>
+
+ <unzip
src="${build.ivy.lib.dir}/default/snappy-${snappy.version}.jar"
dest="${build.dir.hive}/snappy/classes">
<patternset>
@@ -296,14 +305,11 @@
<fileset dir="${build.dir.hive}/shims/classes" includes="**/*.class"/>
<fileset dir="${build.dir.hive}/javaewah/classes" includes="**/*.class"/>
<fileset dir="${build.dir.hive}/javolution/classes" includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/protobuf-java/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/snappy/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/jackson-core-asl/classes"
- includes="**/*.class"/>
- <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes"
- includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/protobuf-java/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/snappy/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/jackson-core-asl/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/jackson-mapper-asl/classes" includes="**/*.class"/>
+ <fileset dir="${build.dir.hive}/guava/classes" includes="**/*.class"/>
<manifest>
<!-- Not putting these in their own manifest section, since that inserts
a new-line, which breaks the reading of the attributes. -->
Modified: hive/branches/tez/ql/ivy.xml
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/ivy.xml?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/ivy.xml (original)
+++ hive/branches/tez/ql/ivy.xml Fri Aug 30 19:46:15 2013
@@ -61,8 +61,6 @@
<dependency org="org.apache.hadoop" name="hadoop-yarn-client" rev="${hadoop-0.23.version}"
conf="compile->master" transitive="false" />
-
- <!-- hadoop specific guava -->
<dependency org="org.json" name="json" rev="${json.version}"/>
<dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"/>
<dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
@@ -95,6 +93,8 @@
<exclude org="org.apache.commons" module="commons-daemon"/><!--bad POM-->
</dependency>
+ <dependency org="com.google.guava" name="guava" rev="${guava.version}" transitive="false"/>
+
<!-- Test Dependencies -->
<dependency org="junit" name="junit" rev="${junit.version}" conf="test->default" />
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ExtractOperator.java Fri Aug 30 19:46:15 2013
@@ -50,6 +50,11 @@ public class ExtractOperator extends Ope
return OperatorType.EXTRACT;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ForwardOperator.java Fri Aug 30 19:46:15 2013
@@ -41,6 +41,11 @@ public class ForwardOperator extends Ope
return OperatorType.FORWARD;
}
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Fri Aug 30 19:46:15 2013
@@ -1181,4 +1181,15 @@ public class GroupByOperator extends Ope
public OperatorType getType() {
return OperatorType.GROUPBY;
}
+
+ /**
+ * we can push the limit above GBY (running in Reducer), since that will generate single row
+ * for each group. This doesn't necessarily hold for GBY (running in Mappers),
+ * so we don't push limit above it.
+ */
+ @Override
+ public boolean acceptLimitPushdown() {
+ return getConf().getMode() == GroupByDesc.Mode.MERGEPARTIAL ||
+ getConf().getMode() == GroupByDesc.Mode.COMPLETE;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java Fri Aug 30 19:46:15 2013
@@ -92,6 +92,12 @@ public class HashTableSinkOperator exten
private transient MapJoinTableContainer[] mapJoinTables;
private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes;
+
+ private static final Object[] EMPTY_OBJECT_ARRAY = new Object[0];
+ private static final MapJoinRowContainer EMPTY_ROW_CONTAINER = new MapJoinRowContainer();
+ static {
+ EMPTY_ROW_CONTAINER.add(EMPTY_OBJECT_ARRAY);
+ }
private transient boolean noOuterJoin;
@@ -223,19 +229,30 @@ public class HashTableSinkOperator exten
// compute keys and values as StandardObjects
MapJoinKey key = JoinUtil.computeMapJoinKeys(null, row, joinKeys[alias],
joinKeysObjectInspectors[alias]);
- Object[] value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
+ Object[] value = EMPTY_OBJECT_ARRAY;
+ if((hasFilter(alias) && filterMaps[alias].length > 0) || joinValues[alias].size() > 0) {
+ value = JoinUtil.computeMapJoinValues(row, joinValues[alias],
joinValuesObjectInspectors[alias], joinFilters[alias], joinFilterObjectInspectors[alias],
filterMaps == null ? null : filterMaps[alias]);
+ }
MapJoinTableContainer tableContainer = mapJoinTables[alias];
MapJoinRowContainer rowContainer = tableContainer.get(key);
if (rowContainer == null) {
- rowContainer = new MapJoinRowContainer();
- rowContainer.add(value);
+ if(value.length != 0) {
+ rowContainer = new MapJoinRowContainer();
+ rowContainer.add(value);
+ } else {
+ rowContainer = EMPTY_ROW_CONTAINER;
+ }
rowNumber++;
if (rowNumber > hashTableScale && rowNumber % hashTableScale == 0) {
memoryExhaustionHandler.checkMemoryStatus(tableContainer.size(), rowNumber);
}
tableContainer.put(key, rowContainer);
+ } else if (rowContainer == EMPTY_ROW_CONTAINER) {
+ rowContainer = rowContainer.copy();
+ rowContainer.add(value);
+ tableContainer.put(key, rowContainer);
} else {
rowContainer.add(value);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Fri Aug 30 19:46:15 2013
@@ -602,6 +602,9 @@ public abstract class Operator<T extends
state = State.CLOSE;
LOG.info(id + " finished. closing... ");
+ // call the operator specific close routine
+ closeOp(abort);
+
if (counterNameToEnum != null) {
incrCounter(numInputRowsCntr, inputRows);
incrCounter(numOutputRowsCntr, outputRows);
@@ -610,9 +613,6 @@ public abstract class Operator<T extends
LOG.info(id + " forwarded " + cntr + " rows");
- // call the operator specific close routine
- closeOp(abort);
-
try {
logStats();
if (childOperators == null) {
@@ -826,13 +826,7 @@ public abstract class Operator<T extends
}
}
- if (isLogInfoEnabled) {
- cntr++;
- if (cntr == nextCntr) {
- LOG.info(id + " forwarding " + cntr + " rows");
- nextCntr = getNextCntr(cntr);
- }
- }
+ increaseForward(1);
// For debugging purposes:
// System.out.println("" + this.getClass() + ": " +
@@ -865,6 +859,18 @@ public abstract class Operator<T extends
}
}
+ void increaseForward(long counter) {
+ if (isLogInfoEnabled) {
+ cntr += counter;
+ if (cntr >= nextCntr) {
+ LOG.info(id + " forwarding " + cntr + " rows");
+ do {
+ nextCntr = getNextCntr(nextCntr);
+ } while(cntr >= nextCntr);
+ }
+ }
+ }
+
public void resetStats() {
for (Enum<?> e : statsMap.keySet()) {
statsMap.get(e).set(0L);
@@ -1525,6 +1531,17 @@ public abstract class Operator<T extends
return stats;
}
+ /**
+ * used for LimitPushdownOptimizer
+ *
+ * if all of the operators between limit and reduce-sink does not remove any input rows
+ * in the range of limit count, limit can be pushed down to reduce-sink operator.
+ * forward, select, etc.
+ */
+ public boolean acceptLimitPushdown() {
+ return false;
+ }
+
@Override
public String toString() {
return getName() + "[" + getIdentifier() + "]";
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/PTFOperator.java Fri Aug 30 19:46:15 2013
@@ -34,8 +34,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFInputDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PartitionedTableFunctionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDeserializer;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFLeadLag;
@@ -44,11 +42,9 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
-import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
-public class PTFOperator extends Operator<PTFDesc> implements Serializable
-{
+public class PTFOperator extends Operator<PTFDesc> implements Serializable {
private static final long serialVersionUID = 1L;
PTFPartition inputPart;
@@ -67,8 +63,7 @@ public class PTFOperator extends Operato
* 4. Create input partition to store rows coming from previous operator
*/
@Override
- protected void initializeOp(Configuration jobConf) throws HiveException
- {
+ protected void initializeOp(Configuration jobConf) throws HiveException {
hiveConf = new HiveConf(jobConf, PTFOperator.class);
// if the parent is ExtractOperator, this invocation is from reduce-side
Operator<? extends OperatorDesc> parentOp = getParentOperators().get(0);
@@ -78,13 +73,10 @@ public class PTFOperator extends Operato
inputPart = createFirstPartitionForChain(
inputObjInspectors[0], hiveConf, isMapOperator);
- if (isMapOperator)
- {
+ if (isMapOperator) {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
outputObjInspector = tDef.getRawInputShape().getOI();
- }
- else
- {
+ } else {
outputObjInspector = conf.getFuncDef().getOutputShape().getOI();
}
@@ -94,16 +86,12 @@ public class PTFOperator extends Operato
}
@Override
- protected void closeOp(boolean abort) throws HiveException
- {
+ protected void closeOp(boolean abort) throws HiveException {
super.closeOp(abort);
if(inputPart.size() != 0){
- if (isMapOperator)
- {
+ if (isMapOperator) {
processMapFunction();
- }
- else
- {
+ } else {
processInputPartition();
}
}
@@ -113,8 +101,7 @@ public class PTFOperator extends Operato
@Override
public void processOp(Object row, int tag) throws HiveException
{
- if (!isMapOperator )
- {
+ if (!isMapOperator ) {
/*
* checkif current row belongs to the current accumulated Partition:
* - If not:
@@ -126,20 +113,15 @@ public class PTFOperator extends Operato
boolean keysAreEqual = (currentKeys != null && newKeys != null)?
newKeys.equals(currentKeys) : false;
- if (currentKeys != null && !keysAreEqual)
- {
+ if (currentKeys != null && !keysAreEqual) {
processInputPartition();
inputPart.reset();
}
- if (currentKeys == null || !keysAreEqual)
- {
- if (currentKeys == null)
- {
+ if (currentKeys == null || !keysAreEqual) {
+ if (currentKeys == null) {
currentKeys = newKeys.copyKey();
- }
- else
- {
+ } else {
currentKeys.copyKey(newKeys);
}
}
@@ -156,16 +138,14 @@ public class PTFOperator extends Operato
* @param hiveConf
* @throws HiveException
*/
- protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException
- {
+ protected void reconstructQueryDef(HiveConf hiveConf) throws HiveException {
PTFDeserializer dS =
new PTFDeserializer(conf, (StructObjectInspector)inputObjInspectors[0], hiveConf);
dS.initializePTFChain(conf.getFuncDef());
}
- protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException
- {
+ protected void setupKeysWrapper(ObjectInspector inputOI) throws HiveException {
PartitionDef pDef = conf.getStartOfChain().getPartition();
ArrayList<PTFExpressionDef> exprs = pDef.getExpressions();
int numExprs = exprs.size();
@@ -173,8 +153,7 @@ public class PTFOperator extends Operato
ObjectInspector[] keyOIs = new ObjectInspector[numExprs];
ObjectInspector[] currentKeyOIs = new ObjectInspector[numExprs];
- for(int i=0; i<numExprs; i++)
- {
+ for(int i=0; i<numExprs; i++) {
PTFExpressionDef exprDef = exprs.get(i);
/*
* Why cannot we just use the ExprNodeEvaluator on the column?
@@ -192,29 +171,20 @@ public class PTFOperator extends Operato
newKeys = keyWrapperFactory.getKeyWrapper();
}
- protected void processInputPartition() throws HiveException
- {
+ protected void processInputPartition() throws HiveException {
PTFPartition outPart = executeChain(inputPart);
- if ( conf.forWindowing() ) {
- executeWindowExprs(outPart);
- }
- else {
- PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
- Object oRow = pItr.next();
- forward(oRow, outputObjInspector);
- }
- }
+ PTFPartitionIterator<Object> pItr = outPart.iterator();
+ while (pItr.hasNext()) {
+ Object oRow = pItr.next();
+ forward(oRow, outputObjInspector);
+ }
}
- protected void processMapFunction() throws HiveException
- {
+ protected void processMapFunction() throws HiveException {
PartitionedTableFunctionDef tDef = conf.getStartOfChain();
PTFPartition outPart = tDef.getTFunction().transformRawInput(inputPart);
PTFPartitionIterator<Object> pItr = outPart.iterator();
- while (pItr.hasNext())
- {
+ while (pItr.hasNext()) {
Object oRow = pItr.next();
forward(oRow, outputObjInspector);
}
@@ -234,8 +204,7 @@ public class PTFOperator extends Operato
@Override
- public OperatorType getType()
- {
+ public OperatorType getType() {
return OperatorType.PTF;
}
@@ -250,124 +219,23 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
private PTFPartition executeChain(PTFPartition part)
- throws HiveException
- {
+ throws HiveException {
Stack<PartitionedTableFunctionDef> fnDefs = new Stack<PartitionedTableFunctionDef>();
PTFInputDef iDef = conf.getFuncDef();
- while (true)
- {
- if (iDef instanceof PartitionedTableFunctionDef)
- {
- fnDefs.push((PartitionedTableFunctionDef) iDef);
- iDef = ((PartitionedTableFunctionDef) iDef).getInput();
- }
- else
- {
- break;
- }
+
+ while (iDef instanceof PartitionedTableFunctionDef) {
+ fnDefs.push((PartitionedTableFunctionDef) iDef);
+ iDef = ((PartitionedTableFunctionDef) iDef).getInput();
}
PartitionedTableFunctionDef currFnDef;
- while (!fnDefs.isEmpty())
- {
+ while (!fnDefs.isEmpty()) {
currFnDef = fnDefs.pop();
part = currFnDef.getTFunction().execute(part);
}
return part;
}
- /**
- * If WindowingSpec contains any Windowing Expressions or has a
- * Having condition then these are processed
- * and forwarded on. Whereas when there is no having or WdwExprs
- * just forward the rows in the output Partition.
- *
- * For e.g. Consider the following query:
- * <pre>
- * {@code
- * select rank(), lead(rank(),1),...
- * from xyz
- * ...
- * having rank() > 1
- * }
- * </pre>
- * rank() gets processed as a WdwFn; Its in the oPart(output partition)
- * argument to executeWindowExprs. Here we first evaluate the having expression.
- * So the first row of oPart gets filtered out.
- * Next we evaluate lead(rank()) which is held as a WindowExpression and add it to the output.
- *
- * @param ptfDesc
- * @param oPart output partition after Window Fns are processed.
- * @param op
- * @throws HiveException
- */
- private void executeWindowExprs(PTFPartition oPart)
- throws HiveException
- {
- WindowTableFunctionDef wTFnDef = (WindowTableFunctionDef) conf.getFuncDef();
- /*
- * inputOI represents the row with WindowFn results present.
- * So in the e.g. above it will have a column for 'rank()'
- */
- StructObjectInspector inputOI = wTFnDef.getOutputFromWdwFnProcessing().getOI();
- /*
- * outputOI represents the final row with the Windowing Expressions added.
- * So in the e.g. above it will have a column for 'lead(rank())' in addition to
- * all columns in inputOI.
- */
- StructObjectInspector outputOI = wTFnDef.getOutputShape().getOI();
- int numCols = outputOI.getAllStructFieldRefs().size();
- ArrayList<WindowExpressionDef> wdwExprs = wTFnDef.getWindowExpressions();
- int numWdwExprs = wdwExprs == null ? 0 : wdwExprs.size();
- Object[] output = new Object[numCols];
-
- /*
- * If this Windowing invocation has no Window Expressions and doesn't need to be filtered,
- * we can just forward the row in the oPart partition.
- */
- boolean forwardRowsUntouched = (wdwExprs == null || wdwExprs.size() == 0 );
-
- PTFPartitionIterator<Object> pItr = oPart.iterator();
- PTFOperator.connectLeadLagFunctionsToPartition(conf, pItr);
- while (pItr.hasNext())
- {
- int colCnt = 0;
- Object oRow = pItr.next();
-
- /*
- * when there is no Windowing expressions or having;
- * just forward the Object coming out of the Partition.
- */
- if ( forwardRowsUntouched ) {
- forward(oRow, outputObjInspector);
- continue;
- }
-
- /*
- * Setup the output row columns in the following order
- * - the columns in the SelectList processed by the PTF
- * (ie the Select Exprs that have navigation expressions)
- * - the columns from the final PTF.
- */
-
- if ( wdwExprs != null ) {
- for (WindowExpressionDef wdwExpr : wdwExprs)
- {
- Object newCol = wdwExpr.getExprEvaluator().evaluate(oRow);
- output[colCnt++] = newCol;
- }
- }
-
- for(; colCnt < numCols; ) {
- StructField field = inputOI.getAllStructFieldRefs().get(colCnt - numWdwExprs);
- output[colCnt++] =
- ObjectInspectorUtils.copyToStandardObject(inputOI.getStructFieldData(oRow, field),
- field.getFieldObjectInspector());
- }
-
- forward(output, outputObjInspector);
- }
- }
/**
* Create a new Partition.
@@ -390,8 +258,7 @@ public class PTFOperator extends Operato
* @throws HiveException
*/
public PTFPartition createFirstPartitionForChain(ObjectInspector oi,
- HiveConf hiveConf, boolean isMapSide) throws HiveException
- {
+ HiveConf hiveConf, boolean isMapSide) throws HiveException {
PartitionedTableFunctionDef tabDef = conf.getStartOfChain();
TableFunctionEvaluator tEval = tabDef.getTFunction();
@@ -410,14 +277,12 @@ public class PTFOperator extends Operato
}
public static void connectLeadLagFunctionsToPartition(PTFDesc ptfDesc,
- PTFPartitionIterator<Object> pItr) throws HiveException
- {
+ PTFPartitionIterator<Object> pItr) throws HiveException {
List<ExprNodeGenericFuncDesc> llFnDescs = ptfDesc.getLlInfo().getLeadLagExprs();
if (llFnDescs == null) {
return;
}
- for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs)
- {
+ for (ExprNodeGenericFuncDesc llFnDesc : llFnDescs) {
GenericUDFLeadLag llFn = (GenericUDFLeadLag) llFnDesc
.getGenericUDF();
llFn.setpItr(pItr);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Fri Aug 30 19:46:15 2013
@@ -33,7 +33,6 @@ import org.apache.hadoop.hive.ql.plan.Ex
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
-import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.InspectableObject;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -44,13 +43,12 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
/**
* Reduce Sink Operator sends output to the reduce stage.
**/
public class ReduceSinkOperator extends TerminalOperator<ReduceSinkDesc>
- implements Serializable {
+ implements Serializable, TopNHash.BinaryCollector {
private static final long serialVersionUID = 1L;
@@ -90,6 +88,9 @@ public class ReduceSinkOperator extends
return inputAlias;
}
+ // picks topN K:V pairs from input. can be null
+ private transient TopNHash reducerHash;
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
@@ -131,6 +132,8 @@ public class ReduceSinkOperator extends
.newInstance();
valueSerializer.initialize(null, valueTableDesc.getProperties());
+ reducerHash = createTopKHash();
+
firstRow = true;
initializeChildren(hconf);
} catch (Exception e) {
@@ -139,14 +142,44 @@ public class ReduceSinkOperator extends
}
}
+ private TopNHash createTopKHash() {
+ int limit = conf.getTopN();
+ float percent = conf.getTopNMemoryUsage();
+ if (limit < 0 || percent <= 0) {
+ return null;
+ }
+ if (limit == 0) {
+ return TopNHash.create0();
+ }
+ // limit * 64 : compensation of arrays for key/value/hashcodes
+ long threshold = (long) (percent * Runtime.getRuntime().maxMemory()) - limit * 64;
+ if (threshold < 0) {
+ return null;
+ }
+ return TopNHash.create(conf.isMapGroupBy(), limit, threshold, this);
+ }
+
transient InspectableObject tempInspectableObject = new InspectableObject();
transient HiveKey keyWritable = new HiveKey();
- transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
+ /**
+ * This two dimensional array holds key data and a corresponding Union object
+ * which contains the tag identifying the aggregate expression for distinct columns.
+ *
+ * If there is no distict expression, cachedKeys is simply like this.
+ * cachedKeys[0] = [col0][col1]
+ *
+ * with two distict expression, union(tag:key) is attatched for each distinct expression
+ * cachedKeys[0] = [col0][col1][0:dist1]
+ * cachedKeys[1] = [col0][col1][1:dist2]
+ *
+ * in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
+ * see {@link ExprNodeColumnEvaluator}
+ */
transient Object[][] cachedKeys;
transient Object[] cachedValues;
transient List<List<Integer>> distinctColIndices;
@@ -198,6 +231,7 @@ public class ReduceSinkOperator extends
}
@Override
+ @SuppressWarnings("unchecked")
public void processOp(Object row, int tag) throws HiveException {
try {
ObjectInspector rowInspector = inputObjInspectors[tag];
@@ -241,8 +275,6 @@ public class ReduceSinkOperator extends
for (int i = 0; i < valueEval.length; i++) {
cachedValues[i] = valueEval[i].evaluate(row);
}
- // Serialize the value
- value = valueSerializer.serialize(cachedValues, valueObjectInspector);
// Evaluate the keys
Object[] distributionKeys = new Object[numDistributionKeys];
@@ -267,6 +299,8 @@ public class ReduceSinkOperator extends
// no distinct key
System.arraycopy(distributionKeys, 0, cachedKeys[0], 0, numDistributionKeys);
}
+
+ BytesWritable value = null;
// Serialize the keys and append the tag
for (int i = 0; i < cachedKeys.length; i++) {
if (keyIsText) {
@@ -294,26 +328,85 @@ public class ReduceSinkOperator extends
}
}
keyWritable.setHashCode(keyHashCode);
- if (out != null) {
- out.collect(keyWritable, value);
- // Since this is a terminal operator, update counters explicitly -
- // forward is not called
- if (counterNameToEnum != null) {
- ++outputRows;
- if (outputRows % 1000 == 0) {
- incrCounter(numOutputRowsCntr, outputRows);
- outputRows = 0;
+
+ if (reducerHash == null) {
+ if (null != out) {
+ collect(keyWritable, value = getValue(row, value));
+ }
+ } else {
+ int index = reducerHash.indexOf(keyWritable);
+ if (index == TopNHash.EXCLUDED) {
+ continue;
+ }
+ value = getValue(row, value);
+ if (index >= 0) {
+ reducerHash.set(index, value);
+ } else {
+ if (index == TopNHash.FORWARD) {
+ collect(keyWritable, value);
+ } else if (index == TopNHash.FLUSH) {
+ LOG.info("Top-N hash is flushed");
+ reducerHash.flush();
+ // we can now retry adding key/value into hash, which is flushed.
+ // but for simplicity, just forward them
+ collect(keyWritable, value);
+ } else if (index == TopNHash.DISABLE) {
+ LOG.info("Top-N hash is disabled");
+ reducerHash.flush();
+ collect(keyWritable, value);
+ reducerHash = null;
}
}
}
}
- } catch (SerDeException e) {
- throw new HiveException(e);
- } catch (IOException e) {
+ } catch (HiveException e) {
+ throw e;
+ } catch (Exception e) {
throw new HiveException(e);
}
}
+ public void collect(BytesWritable key, BytesWritable value) throws IOException {
+ // Since this is a terminal operator, update counters explicitly -
+ // forward is not called
+ out.collect(key, value);
+ if (++outputRows % 1000 == 0) {
+ if (counterNameToEnum != null) {
+ incrCounter(numOutputRowsCntr, outputRows);
+ }
+ increaseForward(outputRows);
+ outputRows = 0;
+ }
+ }
+
+ // evaluate value lazily
+ private BytesWritable getValue(Object row, BytesWritable value) throws Exception {
+ if (value != null) {
+ return value;
+ }
+ // Evaluate the value
+ for (int i = 0; i < valueEval.length; i++) {
+ cachedValues[i] = valueEval[i].evaluate(row);
+ }
+ // Serialize the value
+ return (BytesWritable) valueSerializer.serialize(cachedValues, valueObjectInspector);
+ }
+
+ @Override
+ protected void closeOp(boolean abort) throws HiveException {
+ if (!abort && reducerHash != null) {
+ try {
+ reducerHash.flush();
+ } catch (IOException e) {
+ throw new HiveException(e);
+ } finally {
+ reducerHash = null;
+ }
+ }
+ reducerHash = null;
+ super.closeOp(abort);
+ }
+
/**
* @return the name of the operator
*/
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/SelectOperator.java Fri Aug 30 19:46:15 2013
@@ -124,4 +124,9 @@ public class SelectOperator extends Oper
public boolean supportUnionRemoveOptimization() {
return true;
}
+
+ @Override
+ public boolean acceptLimitPushdown() {
+ return true;
+ }
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveKey.java Fri Aug 30 19:46:15 2013
@@ -35,6 +35,12 @@ public class HiveKey extends BytesWritab
hashCodeValid = false;
}
+ public HiveKey(byte[] bytes, int hashcode) {
+ super(bytes);
+ myHashCode = hashcode;
+ hashCodeValid = true;
+ }
+
protected int myHashCode;
public void setHashCode(int myHashCode) {
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java Fri Aug 30 19:46:15 2013
@@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.io.orc
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import java.io.IOException;
@@ -47,19 +48,67 @@ public final class OrcFile {
* prevent the new reader from reading ORC files generated by any released
* version of Hive.
*/
- public static final int MAJOR_VERSION = 0;
- public static final int MINOR_VERSION = 11;
+ public static enum Version {
+ V_0_11("0.11", 0, 11),
+ V_0_12("0.12", 0, 12);
+
+ public static final Version CURRENT = V_0_12;
+
+ private final String name;
+ private final int major;
+ private final int minor;
+
+ private Version(String name, int major, int minor) {
+ this.name = name;
+ this.major = major;
+ this.minor = minor;
+ }
+
+ public static Version byName(String name) {
+ for(Version version: values()) {
+ if (version.name.equals(name)) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unknown ORC version " + name);
+ }
+
+ /**
+ * Get the human readable name for the version.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Get the major version number.
+ */
+ public int getMajor() {
+ return major;
+ }
+
+ /**
+ * Get the minor version number.
+ */
+ public int getMinor() {
+ return minor;
+ }
+ }
// the table properties that control ORC files
public static final String COMPRESSION = "orc.compress";
- static final String DEFAULT_COMPRESSION = "ZLIB";
public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size";
- static final String DEFAULT_COMPRESSION_BLOCK_SIZE = "262144";
public static final String STRIPE_SIZE = "orc.stripe.size";
- static final String DEFAULT_STRIPE_SIZE = "268435456";
public static final String ROW_INDEX_STRIDE = "orc.row.index.stride";
- static final String DEFAULT_ROW_INDEX_STRIDE = "10000";
public static final String ENABLE_INDEXES = "orc.create.index";
+ public static final String BLOCK_PADDING = "orc.block.padding";
+
+ static final long DEFAULT_STRIPE_SIZE = 256 * 1024 * 1024;
+ static final CompressionKind DEFAULT_COMPRESSION_KIND =
+ CompressionKind.ZLIB;
+ static final int DEFAULT_BUFFER_SIZE = 256 * 1024;
+ static final int DEFAULT_ROW_INDEX_STRIDE = 10000;
+ static final boolean DEFAULT_BLOCK_PADDING = true;
// unused
private OrcFile() {}
@@ -77,7 +126,145 @@ public final class OrcFile {
}
/**
- * Create an ORC file streamFactory.
+ * Options for creating ORC file writers.
+ */
+ public static class WriterOptions {
+ private final Configuration configuration;
+ private FileSystem fileSystemValue = null;
+ private ObjectInspector inspectorValue = null;
+ private long stripeSizeValue = DEFAULT_STRIPE_SIZE;
+ private int rowIndexStrideValue = DEFAULT_ROW_INDEX_STRIDE;
+ private int bufferSizeValue = DEFAULT_BUFFER_SIZE;
+ private boolean blockPaddingValue = DEFAULT_BLOCK_PADDING;
+ private CompressionKind compressValue = DEFAULT_COMPRESSION_KIND;
+ private MemoryManager memoryManagerValue;
+ private Version versionValue;
+
+ WriterOptions(Configuration conf) {
+ configuration = conf;
+ memoryManagerValue = getMemoryManager(conf);
+ String versionName =
+ conf.get(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
+ if (versionName == null) {
+ versionValue = Version.CURRENT;
+ } else {
+ versionValue = Version.byName(versionName);
+ }
+ }
+
+ /**
+ * Provide the filesystem for the path, if the client has it available.
+ * If it is not provided, it will be found from the path.
+ */
+ public WriterOptions fileSystem(FileSystem value) {
+ fileSystemValue = value;
+ return this;
+ }
+
+ /**
+ * Set the stripe size for the file. The writer stores the contents of the
+ * stripe in memory until this memory limit is reached and the stripe
+ * is flushed to the HDFS file and the next stripe started.
+ */
+ public WriterOptions stripeSize(long value) {
+ stripeSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Set the distance between entries in the row index. The minimum value is
+ * 1000 to prevent the index from overwhelming the data. If the stride is
+ * set to 0, no indexes will be included in the file.
+ */
+ public WriterOptions rowIndexStride(int value) {
+ rowIndexStrideValue = value;
+ return this;
+ }
+
+ /**
+ * The size of the memory buffers used for compressing and storing the
+ * stripe in memory.
+ */
+ public WriterOptions bufferSize(int value) {
+ bufferSizeValue = value;
+ return this;
+ }
+
+ /**
+ * Sets whether the HDFS blocks are padded to prevent stripes from
+ * straddling blocks. Padding improves locality and thus the speed of
+ * reading, but costs space.
+ */
+ public WriterOptions blockPadding(boolean value) {
+ blockPaddingValue = value;
+ return this;
+ }
+
+ /**
+ * Sets the generic compression that is used to compress the data.
+ */
+ public WriterOptions compress(CompressionKind value) {
+ compressValue = value;
+ return this;
+ }
+
+ /**
+ * A required option that sets the object inspector for the rows. Used
+ * to determine the schema for the file.
+ */
+ public WriterOptions inspector(ObjectInspector value) {
+ inspectorValue = value;
+ return this;
+ }
+
+ /**
+ * Sets the version of the file that will be written.
+ */
+ public WriterOptions version(Version value) {
+ versionValue = value;
+ return this;
+ }
+
+ /**
+ * A package local option to set the memory manager.
+ */
+ WriterOptions memory(MemoryManager value) {
+ memoryManagerValue = value;
+ return this;
+ }
+ }
+
+ /**
+ * Create a default set of write options that can be modified.
+ */
+ public static WriterOptions writerOptions(Configuration conf) {
+ return new WriterOptions(conf);
+ }
+
+ /**
+ * Create an ORC file writer. This is the public interface for creating
+ * writers going forward and new options will only be added to this method.
+ * @param path filename to write to
+ * @param options the options
+ * @return a new ORC file writer
+ * @throws IOException
+ */
+ public static Writer createWriter(Path path,
+ WriterOptions opts
+ ) throws IOException {
+ FileSystem fs = opts.fileSystemValue == null ?
+ path.getFileSystem(opts.configuration) : opts.fileSystemValue;
+
+ return new WriterImpl(fs, path, opts.configuration, opts.inspectorValue,
+ opts.stripeSizeValue, opts.compressValue,
+ opts.bufferSizeValue, opts.rowIndexStrideValue,
+ opts.memoryManagerValue, opts.blockPaddingValue,
+ opts.versionValue);
+ }
+
+ /**
+ * Create an ORC file writer. This method is provided for API backward
+ * compatability with Hive 0.11.
* @param fs file system
* @param path filename to write to
* @param inspector the ObjectInspector that inspects the rows
@@ -86,7 +273,7 @@ public final class OrcFile {
* @param bufferSize the number of bytes to compress at once
* @param rowIndexStride the number of rows between row index entries or
* 0 to suppress all indexes
- * @return a new ORC file streamFactory
+ * @return a new ORC file writer
* @throws IOException
*/
public static Writer createWriter(FileSystem fs,
@@ -97,8 +284,14 @@ public final class OrcFile {
CompressionKind compress,
int bufferSize,
int rowIndexStride) throws IOException {
- return new WriterImpl(fs, path, conf, inspector, stripeSize, compress,
- bufferSize, rowIndexStride, getMemoryManager(conf));
+ return createWriter(path,
+ writerOptions(conf)
+ .fileSystem(fs)
+ .inspector(inspector)
+ .stripeSize(stripeSize)
+ .compress(compress)
+ .bufferSize(bufferSize)
+ .rowIndexStride(rowIndexStride));
}
private static MemoryManager memoryManager = null;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java Fri Aug 30 19:46:15 2013
@@ -47,32 +47,20 @@ public class OrcOutputFormat extends Fil
implements RecordWriter<NullWritable, OrcSerdeRow>,
FileSinkOperator.RecordWriter {
private Writer writer = null;
- private final FileSystem fs;
private final Path path;
- private final Configuration conf;
- private final long stripeSize;
- private final int compressionSize;
- private final CompressionKind compress;
- private final int rowIndexStride;
-
- OrcRecordWriter(FileSystem fs, Path path, Configuration conf,
- String stripeSize, String compress,
- String compressionSize, String rowIndexStride) {
- this.fs = fs;
+ private final OrcFile.WriterOptions options;
+
+ OrcRecordWriter(Path path, OrcFile.WriterOptions options) {
this.path = path;
- this.conf = conf;
- this.stripeSize = Long.valueOf(stripeSize);
- this.compress = CompressionKind.valueOf(compress);
- this.compressionSize = Integer.valueOf(compressionSize);
- this.rowIndexStride = Integer.valueOf(rowIndexStride);
+ this.options = options;
}
@Override
public void write(NullWritable nullWritable,
OrcSerdeRow row) throws IOException {
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, this.conf, row.getInspector(),
- stripeSize, compress, compressionSize, rowIndexStride);
+ options.inspector(row.getInspector());
+ writer = OrcFile.createWriter(path, options);
}
writer.addRow(row.getRow());
}
@@ -81,9 +69,8 @@ public class OrcOutputFormat extends Fil
public void write(Writable row) throws IOException {
OrcSerdeRow serdeRow = (OrcSerdeRow) row;
if (writer == null) {
- writer = OrcFile.createWriter(fs, path, this.conf,
- serdeRow.getInspector(), stripeSize, compress, compressionSize,
- rowIndexStride);
+ options.inspector(serdeRow.getInspector());
+ writer = OrcFile.createWriter(path, options);
}
writer.addRow(serdeRow.getRow());
}
@@ -102,8 +89,8 @@ public class OrcOutputFormat extends Fil
ObjectInspector inspector = ObjectInspectorFactory.
getStandardStructObjectInspector(new ArrayList<String>(),
new ArrayList<ObjectInspector>());
- writer = OrcFile.createWriter(fs, path, this.conf, inspector,
- stripeSize, compress, compressionSize, rowIndexStride);
+ options.inspector(inspector);
+ writer = OrcFile.createWriter(path, options);
}
writer.close();
}
@@ -113,9 +100,8 @@ public class OrcOutputFormat extends Fil
public RecordWriter<NullWritable, OrcSerdeRow>
getRecordWriter(FileSystem fileSystem, JobConf conf, String name,
Progressable reporter) throws IOException {
- return new OrcRecordWriter(fileSystem, new Path(name), conf,
- OrcFile.DEFAULT_STRIPE_SIZE, OrcFile.DEFAULT_COMPRESSION,
- OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE, OrcFile.DEFAULT_ROW_INDEX_STRIDE);
+ return new
+ OrcRecordWriter(new Path(name), OrcFile.writerOptions(conf));
}
@Override
@@ -126,20 +112,42 @@ public class OrcOutputFormat extends Fil
boolean isCompressed,
Properties tableProperties,
Progressable reporter) throws IOException {
- String stripeSize = tableProperties.getProperty(OrcFile.STRIPE_SIZE,
- OrcFile.DEFAULT_STRIPE_SIZE);
- String compression = tableProperties.getProperty(OrcFile.COMPRESSION,
- OrcFile.DEFAULT_COMPRESSION);
- String compressionSize =
- tableProperties.getProperty(OrcFile.COMPRESSION_BLOCK_SIZE,
- OrcFile.DEFAULT_COMPRESSION_BLOCK_SIZE);
- String rowIndexStride =
- tableProperties.getProperty(OrcFile.ROW_INDEX_STRIDE,
- OrcFile.DEFAULT_ROW_INDEX_STRIDE);
- if ("false".equals(tableProperties.getProperty(OrcFile.ENABLE_INDEXES))) {
- rowIndexStride = "0";
+ OrcFile.WriterOptions options = OrcFile.writerOptions(conf);
+ if (tableProperties.containsKey(OrcFile.STRIPE_SIZE)) {
+ options.stripeSize(Long.parseLong
+ (tableProperties.getProperty(OrcFile.STRIPE_SIZE)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.COMPRESSION)) {
+ options.compress(CompressionKind.valueOf
+ (tableProperties.getProperty(OrcFile.COMPRESSION)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.COMPRESSION_BLOCK_SIZE)) {
+ options.bufferSize(Integer.parseInt
+ (tableProperties.getProperty
+ (OrcFile.COMPRESSION_BLOCK_SIZE)));
+ }
+
+ if (tableProperties.containsKey(OrcFile.ROW_INDEX_STRIDE)) {
+ options.rowIndexStride(Integer.parseInt
+ (tableProperties.getProperty
+ (OrcFile.ROW_INDEX_STRIDE)));
}
- return new OrcRecordWriter(path.getFileSystem(conf), path, conf,
- stripeSize, compression, compressionSize, rowIndexStride);
+
+ if (tableProperties.containsKey(OrcFile.ENABLE_INDEXES)) {
+ if ("false".equals(tableProperties.getProperty
+ (OrcFile.ENABLE_INDEXES))) {
+ options.rowIndexStride(0);
+ }
+ }
+
+ if (tableProperties.containsKey(OrcFile.BLOCK_PADDING)) {
+ options.blockPadding(Boolean.parseBoolean
+ (tableProperties.getProperty
+ (OrcFile.BLOCK_PADDING)));
+ }
+
+ return new OrcRecordWriter(path, options);
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/ReaderImpl.java Fri Aug 30 19:46:15 2013
@@ -248,11 +248,13 @@ final class ReaderImpl implements Reader
if (version.size() >= 2) {
minor = version.get(1);
}
- if (major > OrcFile.MAJOR_VERSION ||
- (major == OrcFile.MAJOR_VERSION && minor > OrcFile.MINOR_VERSION)) {
- log.warn("ORC file " + path + " was written by a future Hive version " +
- versionString(version) + ". This file may not be readable by " +
- "this version of Hive.");
+ if (major > OrcFile.Version.CURRENT.getMajor() ||
+ (major == OrcFile.Version.CURRENT.getMajor() &&
+ minor > OrcFile.Version.CURRENT.getMinor())) {
+ log.warn("ORC file " + path +
+ " was written by a future Hive version " +
+ versionString(version) +
+ ". This file may not be readable by this version of Hive.");
}
}
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/orc/WriterImpl.java Fri Aug 30 19:46:15 2013
@@ -82,13 +82,18 @@ class WriterImpl implements Writer, Memo
private static final int HDFS_BUFFER_SIZE = 256 * 1024;
private static final int MIN_ROW_INDEX_STRIDE = 1000;
+ // HDFS requires blocks < 2GB and multiples of 512, so pick 1.5GB
+ private static final long MAX_BLOCK_SIZE = 1536 * 1024 * 1024;
+
private final FileSystem fs;
private final Path path;
private final long stripeSize;
private final int rowIndexStride;
private final CompressionKind compress;
private final CompressionCodec codec;
+ private final boolean addBlockPadding;
private final int bufferSize;
+ private final long blockSize;
// the streams that make up the current stripe
private final Map<StreamName, BufferedStream> streams =
new TreeMap<StreamName, BufferedStream>();
@@ -113,6 +118,7 @@ class WriterImpl implements Writer, Memo
OrcProto.RowIndex.newBuilder();
private final boolean buildIndex;
private final MemoryManager memoryManager;
+ private final OrcFile.Version version;
private final Configuration conf;
@@ -124,11 +130,17 @@ class WriterImpl implements Writer, Memo
CompressionKind compress,
int bufferSize,
int rowIndexStride,
- MemoryManager memoryManager) throws IOException {
+ MemoryManager memoryManager,
+ boolean addBlockPadding,
+ OrcFile.Version version) throws IOException {
this.fs = fs;
this.path = path;
this.conf = conf;
this.stripeSize = stripeSize;
+ this.version = version;
+ this.addBlockPadding = addBlockPadding;
+ // pick large block size to minimize block over or under hangs
+ this.blockSize = Math.min(MAX_BLOCK_SIZE, 2 * stripeSize);
this.compress = compress;
this.bufferSize = bufferSize;
this.rowIndexStride = rowIndexStride;
@@ -249,6 +261,19 @@ class WriterImpl implements Writer, Memo
}
/**
+ * Get the number of bytes that will be written to the output. Assumes
+ * the stream has already been flushed.
+ * @return the number of bytes
+ */
+ public long getOutputSize() {
+ long result = 0;
+ for(ByteBuffer buffer: output) {
+ result += buffer.remaining();
+ }
+ return result;
+ }
+
+ /**
* Write the saved compressed buffers to the OutputStream.
* @param out the stream to write to
* @throws IOException
@@ -359,6 +384,13 @@ class WriterImpl implements Writer, Memo
public Configuration getConfiguration() {
return conf;
}
+
+ /**
+ * Get the version of the file to write.
+ */
+ public OrcFile.Version getVersion() {
+ return version;
+ }
}
/**
@@ -442,20 +474,7 @@ class WriterImpl implements Writer, Memo
}
boolean isNewWriteFormat(StreamFactory writer) {
- String writeFormat = writer.getConfiguration().get(
- HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.varname);
- if (writeFormat == null) {
- LOG.warn("ORC write format not defined. Using 0.12 ORC write format.");
- return true;
- }
- if (writeFormat
- .equals(HiveConf.ConfVars.HIVE_ORC_WRITE_FORMAT.defaultVal)) {
- LOG.info("Using 0.11 ORC write format.");
- return false;
- }
-
- LOG.info("Using 0.12 ORC write format.");
- return true;
+ return writer.getVersion() != OrcFile.Version.V_0_11;
}
/**
@@ -874,9 +893,10 @@ class WriterImpl implements Writer, Memo
// Set the flag indicating whether or not to use dictionary encoding
// based on whether or not the fraction of distinct keys over number of
// non-null rows is less than the configured threshold
- useDictionaryEncoding = rows.size() > 0 &&
- (float)(dictionary.size()) / rows.size() <=
- dictionaryKeySizeThreshold;
+ useDictionaryEncoding =
+ (!isDirectV2) || (rows.size() > 0 &&
+ (float)(dictionary.size()) / rows.size() <=
+ dictionaryKeySizeThreshold);
final int[] dumpOrder = new int[dictionary.size()];
if (useDictionaryEncoding) {
@@ -1600,12 +1620,11 @@ class WriterImpl implements Writer, Memo
private void ensureWriter() throws IOException {
if (rawWriter == null) {
rawWriter = fs.create(path, false, HDFS_BUFFER_SIZE,
- fs.getDefaultReplication(),
- Math.min(stripeSize * 2L, Integer.MAX_VALUE));
+ fs.getDefaultReplication(), blockSize);
rawWriter.writeBytes(OrcFile.MAGIC);
headerLength = rawWriter.getPos();
writer = new OutStream("metadata", bufferSize, codec,
- new DirectStream(rawWriter));
+ new DirectStream(rawWriter));
protobufWriter = CodedOutputStream.newInstance(writer);
}
}
@@ -1621,43 +1640,70 @@ class WriterImpl implements Writer, Memo
createRowIndexEntry();
}
if (rowsInStripe != 0) {
+
+ // finalize the data for the stripe
int requiredIndexEntries = rowIndexStride == 0 ? 0 :
(int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
OrcProto.StripeFooter.Builder builder =
OrcProto.StripeFooter.newBuilder();
treeWriter.writeStripe(builder, requiredIndexEntries);
- long start = rawWriter.getPos();
- long section = start;
- long indexEnd = start;
+ long indexSize = 0;
+ long dataSize = 0;
for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
BufferedStream stream = pair.getValue();
if (!stream.isSuppressed()) {
stream.flush();
- stream.spillTo(rawWriter);
- long end = rawWriter.getPos();
StreamName name = pair.getKey();
+ long streamSize = pair.getValue().getOutputSize();
builder.addStreams(OrcProto.Stream.newBuilder()
- .setColumn(name.getColumn())
- .setKind(name.getKind())
- .setLength(end-section));
- section = end;
+ .setColumn(name.getColumn())
+ .setKind(name.getKind())
+ .setLength(streamSize));
if (StreamName.Area.INDEX == name.getArea()) {
- indexEnd = end;
+ indexSize += streamSize;
+ } else {
+ dataSize += streamSize;
}
}
+ }
+ OrcProto.StripeFooter footer = builder.build();
+
+ // Do we need to pad the file so the stripe doesn't straddle a block
+ // boundary?
+ long start = rawWriter.getPos();
+ long stripeSize = indexSize + dataSize + footer.getSerializedSize();
+ if (addBlockPadding &&
+ stripeSize < blockSize &&
+ (start % blockSize) + stripeSize > blockSize) {
+ long padding = blockSize - (start % blockSize);
+ byte[] pad = new byte[(int) Math.min(HDFS_BUFFER_SIZE, padding)];
+ start += padding;
+ while (padding > 0) {
+ int writeLen = (int) Math.min(padding, pad.length);
+ rawWriter.write(pad, 0, writeLen);
+ padding -= writeLen;
+ }
+ }
+
+ // write out the data streams
+ for(Map.Entry<StreamName, BufferedStream> pair: streams.entrySet()) {
+ BufferedStream stream = pair.getValue();
+ if (!stream.isSuppressed()) {
+ stream.spillTo(rawWriter);
+ }
stream.clear();
}
- builder.build().writeTo(protobufWriter);
+ footer.writeTo(protobufWriter);
protobufWriter.flush();
writer.flush();
- long end = rawWriter.getPos();
+ long footerLength = rawWriter.getPos() - start - dataSize - indexSize;
OrcProto.StripeInformation dirEntry =
OrcProto.StripeInformation.newBuilder()
.setOffset(start)
- .setIndexLength(indexEnd - start)
- .setDataLength(section - indexEnd)
.setNumberOfRows(rowsInStripe)
- .setFooterLength(end - section).build();
+ .setIndexLength(indexSize)
+ .setDataLength(dataSize)
+ .setFooterLength(footerLength).build();
stripes.add(dirEntry);
rowCount += rowsInStripe;
rowsInStripe = 0;
@@ -1704,7 +1750,8 @@ class WriterImpl implements Writer, Memo
.setName(entry.getKey()).setValue(entry.getValue()));
}
long startPosn = rawWriter.getPos();
- builder.build().writeTo(protobufWriter);
+ OrcProto.Footer footer = builder.build();
+ footer.writeTo(protobufWriter);
protobufWriter.flush();
writer.flush();
return (int) (rawWriter.getPos() - startPosn);
@@ -1716,8 +1763,8 @@ class WriterImpl implements Writer, Memo
.setCompression(writeCompressionKind(compress))
.setFooterLength(footerLength)
.setMagic(OrcFile.MAGIC)
- .addVersion(OrcFile.MAJOR_VERSION)
- .addVersion(OrcFile.MINOR_VERSION);
+ .addVersion(version.getMajor())
+ .addVersion(version.getMinor());
if (compress != CompressionKind.NONE) {
builder.setCompressionBlockSize(bufferSize);
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ColumnPrunerProcFactory.java Fri Aug 30 19:46:15 2013
@@ -68,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.Ma
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc.PTFExpressionDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
import org.apache.hadoop.hive.ql.plan.PlanUtils;
@@ -223,12 +222,6 @@ public final class ColumnPrunerProcFacto
}
}
}
- if ( tDef.getWindowExpressions() != null ) {
- for(WindowExpressionDef expr : tDef.getWindowExpressions()) {
- ExprNodeDesc exprNode = expr.getExprNode();
- Utilities.mergeUniqElems(prunedCols, exprNode.getCols());
- }
- }
if(tDef.getPartition() != null){
for(PTFExpressionDef col : tDef.getPartition().getExpressions()){
ExprNodeDesc exprNode = col.getExprNode();
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMRFileSink1.java Fri Aug 30 19:46:15 2013
@@ -125,7 +125,7 @@ public class GenMRFileSink1 implements N
// no need of merging if the move is to a local file system
MoveTask mvTask = (MoveTask) findMoveTask(mvTasks, fsOp);
- if (isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
+ if (mvTask != null && isInsertTable && hconf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER)) {
addStatsTask(fsOp, mvTask, currTask, parseCtx.getConf());
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/Optimizer.java Fri Aug 30 19:46:15 2013
@@ -110,6 +110,9 @@ public class Optimizer {
!HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVE_OPTIMIZE_SKEWJOIN_COMPILETIME)) {
transformations.add(new CorrelationOptimizer());
}
+ if (HiveConf.getFloatVar(hiveConf, HiveConf.ConfVars.HIVELIMITPUSHDOWNMEMORYUSAGE) > 0) {
+ transformations.add(new LimitPushdownOptimizer());
+ }
transformations.add(new SimpleFetchOptimizer()); // must be called last
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/PTFTranslator.java Fri Aug 30 19:46:15 2013
@@ -72,7 +72,6 @@ import org.apache.hadoop.hive.ql.plan.PT
import org.apache.hadoop.hive.ql.plan.PTFDesc.RangeBoundaryDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ShapeDetails;
import org.apache.hadoop.hive.ql.plan.PTFDesc.ValueBoundaryDef;
-import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowExpressionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFrameDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowFunctionDef;
import org.apache.hadoop.hive.ql.plan.PTFDesc.WindowTableFunctionDef;
@@ -200,83 +199,26 @@ public class PTFTranslator {
/*
* set outputFromWdwFnProcessing
*/
- if (windowFunctions.size() > 0) {
- ArrayList<String> aliases = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
- for (WindowFunctionDef wFnDef : windowFunctions) {
- aliases.add(wFnDef.getAlias());
- if (wFnDef.isPivotResult()) {
- fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
- } else {
- fieldOIs.add(wFnDef.getOI());
- }
- }
- PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
- StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- aliases, fieldOIs);
- tFn.setWdwProcessingOutputOI(wdwOutOI);
- RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef, false);
- ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
- wdwTFnDef.setOutputFromWdwFnProcessing(wdwOutShape);
- }
- else {
- wdwTFnDef.setOutputFromWdwFnProcessing(inpShape);
- }
-
- /*
- * process Wdw expressions
- */
- ShapeDetails wdwOutShape = wdwTFnDef.getOutputFromWdwFnProcessing();
- ArrayList<WindowExpressionDef> windowExpressions = new ArrayList<WindowExpressionDef>();
- if (wdwSpec.getWindowExpressions() != null) {
- for (WindowExpressionSpec expr : wdwSpec.getWindowExpressions()) {
- if (!(expr instanceof WindowFunctionSpec)) {
- try {
- PTFExpressionDef eDef = buildExpressionDef(wdwOutShape, expr.getExpression());
- WindowExpressionDef wdwEDef = new WindowExpressionDef(eDef);
- wdwEDef.setAlias(expr.getAlias());
- windowExpressions.add(wdwEDef);
- } catch (HiveException he) {
- throw new SemanticException(he);
- }
- }
+ ArrayList<String> aliases = new ArrayList<String>();
+ ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
+ for (WindowFunctionDef wFnDef : windowFunctions) {
+ aliases.add(wFnDef.getAlias());
+ if (wFnDef.isPivotResult()) {
+ fieldOIs.add(((ListObjectInspector) wFnDef.getOI()).getListElementObjectInspector());
+ } else {
+ fieldOIs.add(wFnDef.getOI());
}
- wdwTFnDef.setWindowExpressions(windowExpressions);
- }
-
- /*
- * set outputOI
- */
- if (windowExpressions.size() > 0) {
- ArrayList<String> aliases = new ArrayList<String>();
- ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();
- for (WindowExpressionDef wEDef : windowExpressions) {
- aliases.add(wEDef.getAlias());
- fieldOIs.add(wEDef.getOI());
- }
- PTFTranslator.addInputColumnsToList(wdwOutShape, aliases, fieldOIs);
- StructObjectInspector outOI = ObjectInspectorFactory.getStandardStructObjectInspector(
- aliases, fieldOIs);
- RowResolver outRR = buildRowResolverForWindowing(wdwTFnDef, true);
- ShapeDetails outShape = setupShape(outOI, null, outRR);
- wdwTFnDef.setOutputShape(outShape);
- }
- else {
- wdwTFnDef.setOutputShape(copyShape(wdwOutShape));
}
+ PTFTranslator.addInputColumnsToList(inpShape, aliases, fieldOIs);
+ StructObjectInspector wdwOutOI = ObjectInspectorFactory.getStandardStructObjectInspector(
+ aliases, fieldOIs);
+ tFn.setWdwProcessingOutputOI(wdwOutOI);
+ RowResolver wdwOutRR = buildRowResolverForWindowing(wdwTFnDef);
+ ShapeDetails wdwOutShape = setupShape(wdwOutOI, null, wdwOutRR);
+ wdwTFnDef.setOutputShape(wdwOutShape);
tFn.setupOutputOI();
- /*
- * If we have windowExpressions then we convert to Std. Object to process;
- * we just stream these rows; no need to put in an output Partition.
- */
- if (windowExpressions.size() > 0) {
- StructObjectInspector oi = (StructObjectInspector)
- ObjectInspectorUtils.getStandardObjectInspector(wdwTFnDef.getOutputShape().getOI());
- wdwTFnDef.getOutputShape().setOI(oi);
- }
-
return ptfDesc;
}
@@ -949,23 +891,10 @@ public class PTFTranslator {
return rwsch;
}
- protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def,
- boolean addWdwExprs) throws SemanticException {
+ protected RowResolver buildRowResolverForWindowing(WindowTableFunctionDef def)
+ throws SemanticException {
RowResolver rr = new RowResolver();
HashMap<String, WindowExpressionSpec> aliasToExprMap = windowingSpec.getAliasToWdwExpr();
- /*
- * add Window Expressions
- */
- if (addWdwExprs) {
- for (WindowExpressionDef wEDef : def.getWindowExpressions()) {
- ASTNode ast = aliasToExprMap.get(wEDef.getAlias()).getExpression();
- ColumnInfo cInfo = new ColumnInfo(wEDef.getAlias(),
- TypeInfoUtils.getTypeInfoFromObjectInspector(wEDef.getOI()),
- null,
- false);
- rr.putExpression(ast, cInfo);
- }
- }
/*
* add Window Functions
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/WindowingSpec.java Fri Aug 30 19:46:15 2013
@@ -67,19 +67,6 @@ public class WindowingSpec {
windowSpecs.put(name, wdwSpec);
}
- public void addExpression(ASTNode expr, String alias) {
- windowExpressions = windowExpressions == null ?
- new ArrayList<WindowExpressionSpec>() : windowExpressions;
- aliasToWdwExpr = aliasToWdwExpr == null ?
- new HashMap<String, WindowExpressionSpec>() : aliasToWdwExpr;
- WindowExpressionSpec wExprSpec = new WindowExpressionSpec();
- wExprSpec.setAlias(alias);
- wExprSpec.setExpression(expr);
-
- windowExpressions.add(wExprSpec);
- aliasToWdwExpr.put(alias, wExprSpec);
- }
-
public void addWindowFunction(WindowFunctionSpec wFn) {
windowExpressions = windowExpressions == null ?
new ArrayList<WindowExpressionSpec>() : windowExpressions;
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java?rev=1519056&r1=1519055&r2=1519056&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/plan/PTFDesc.java Fri Aug 30 19:46:15 2013
@@ -239,28 +239,8 @@ public class PTFDesc extends AbstractOpe
}
public static class WindowTableFunctionDef extends PartitionedTableFunctionDef {
- ArrayList<WindowExpressionDef> windowExpressions;
ArrayList<WindowFunctionDef> windowFunctions;
- /*
- * this shape omits the non WdwFunction Expressions. Expr Evaluators for the Window Expressions is based on this
- * shape, so they can refer to the Wdw Function values.
- * @note: this will eventually be removed, as plan is to push Wdw expression processing to separate Select Op after
- * PTF Op.
- */
- ShapeDetails outputFromWdwFnProcessing;
-
- public ArrayList<WindowExpressionDef> getWindowExpressions() {
- return windowExpressions;
- }
- public void setWindowExpressions(ArrayList<WindowExpressionDef> windowExpressions) {
- this.windowExpressions = windowExpressions;
- }
- public ShapeDetails getOutputFromWdwFnProcessing() {
- return outputFromWdwFnProcessing;
- }
- public void setOutputFromWdwFnProcessing(ShapeDetails outputFromWdwFnProcessing) {
- this.outputFromWdwFnProcessing = outputFromWdwFnProcessing;
- }
+
public ArrayList<WindowFunctionDef> getWindowFunctions() {
return windowFunctions;
}