You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/10/01 05:12:24 UTC
svn commit: r1527858 [1/2] - in /hive/trunk: ./
ant/src/org/apache/hadoop/hive/ant/
common/src/java/org/apache/hadoop/hive/conf/ conf/ data/files/ ql/
ql/src/gen/vectorization/ ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/java/org/apache/hadoop/h...
Author: hashutosh
Date: Tue Oct 1 03:12:23 2013
New Revision: 1527858
URL: http://svn.apache.org/r1527858
Log:
HIVE-5283 : Merge vectorization branch to trunk (Jitendra Nath Pandey via Ashutosh Chauhan)
Added:
hive/trunk/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
- copied unchanged from r1527856, hive/branches/vectorization/ant/src/org/apache/hadoop/hive/ant/GenVectorCode.java
hive/trunk/ant/src/org/apache/hadoop/hive/ant/GenVectorTestCode.java
- copied unchanged from r1527856, hive/branches/vectorization/ant/src/org/apache/hadoop/hive/ant/GenVectorTestCode.java
hive/trunk/data/files/alltypesorc
- copied unchanged from r1527856, hive/branches/vectorization/data/files/alltypesorc
hive/trunk/ql/src/gen/vectorization/
- copied from r1527856, hive/branches/vectorization/ql/src/gen/vectorization/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
- copied from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/CommonRCFileInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/VectorizedRCFileRecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/Vectorizer.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/
- copied from r1527856, hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java
hive/trunk/ql/src/test/queries/clientpositive/vectorization_short_regress.q
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/test/queries/clientpositive/vectorization_short_regress.q
hive/trunk/ql/src/test/queries/clientpositive/vectorized_rcfile_columnar.q
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/test/queries/clientpositive/vectorized_rcfile_columnar.q
hive/trunk/ql/src/test/results/clientpositive/vectorization_short_regress.q.out
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/test/results/clientpositive/vectorization_short_regress.q.out
hive/trunk/ql/src/test/results/clientpositive/vectorized_rcfile_columnar.q.out
- copied unchanged from r1527856, hive/branches/vectorization/ql/src/test/results/clientpositive/vectorized_rcfile_columnar.q.out
Modified:
hive/trunk/ (props changed)
hive/trunk/build-common.xml
hive/trunk/build.xml
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/ql/build.xml
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFHex.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/util/JavaDataModel.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
hive/trunk/ql/src/test/results/clientpositive/add_part_exist.q.out
hive/trunk/ql/src/test/results/clientpositive/alter1.q.out
hive/trunk/ql/src/test/results/clientpositive/alter2.q.out
hive/trunk/ql/src/test/results/clientpositive/alter3.q.out
hive/trunk/ql/src/test/results/clientpositive/alter4.q.out
hive/trunk/ql/src/test/results/clientpositive/alter5.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_index.q.out
hive/trunk/ql/src/test/results/clientpositive/alter_rename_partition.q.out
hive/trunk/ql/src/test/results/clientpositive/describe_table_json.q.out
hive/trunk/ql/src/test/results/clientpositive/index_creation.q.out
hive/trunk/ql/src/test/results/clientpositive/input2.q.out
hive/trunk/ql/src/test/results/clientpositive/input3.q.out
hive/trunk/ql/src/test/results/clientpositive/input4.q.out
hive/trunk/ql/src/test/results/clientpositive/plan_json.q.out
hive/trunk/ql/src/test/results/clientpositive/rename_column.q.out
hive/trunk/ql/src/test/results/clientpositive/show_tables.q.out
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java
hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyUtils.java
Propchange: hive/trunk/
------------------------------------------------------------------------------
Merged /hive/branches/vectorization:r1466908-1527856
Modified: hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build-common.xml?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/build-common.xml (original)
+++ hive/trunk/build-common.xml Tue Oct 1 03:12:23 2013
@@ -485,7 +485,7 @@
<batchtest todir="${test.build.dir}" unless="testcase">
<fileset dir="${test.build.classes}"
includes="**/${test.include}.class"
- excludes="**/TestSerDe.class,**/TestHiveMetaStore.class,**/TestBeeLineDriver.class,**/TestHiveServer2Concurrency.class,**/*$*.class,${test.junit.exclude}" />
+ excludes="**/ql/exec/vector/util/*.class,**/ql/exec/vector/udf/legacy/*.class,**/ql/exec/vector/udf/generic/*.class,**/TestSerDe.class,**/TestHiveMetaStore.class,**/TestBeeLineDriver.class,**/TestHiveServer2Concurrency.class,**/*$*.class,${test.junit.exclude}" />
</batchtest>
<batchtest todir="${test.build.dir}" if="testcase">
<fileset dir="${test.build.classes}" includes="**/${testcase}.class"/>
Modified: hive/trunk/build.xml
URL: http://svn.apache.org/viewvc/hive/trunk/build.xml?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/build.xml (original)
+++ hive/trunk/build.xml Tue Oct 1 03:12:23 2013
@@ -263,6 +263,12 @@
<target name="init" depends="ivy-init-antlib,deploy-ant-tasks">
<echo message="Project: ${ant.project.name}"/>
<iterate target="init" iterate="${iterate.hive.all}"/>
+
+ <mkdir dir="${build.dir.hive}/ql/gen/vector/org/apache/hadoop/hive/ql/exec/vector/expressions/gen"/>
+ <mkdir dir="${build.dir.hive}/ql/gen/vector/org/apache/hadoop/hive/ql/exec/vector/expressions/aggregates/gen"/>
+ <mkdir dir="${build.dir.hive}/ql/test/src/org/apache/hadoop/hive/ql/exec/vector/expressions/gen"/>
+ <vectorcodegen templateBaseDir="${hive.root}/ql/src/gen/vectorization/" buildDir="${build.dir.hive}" />
+
</target>
<target name="test-init">
@@ -283,8 +289,13 @@
<subant target="jar">
<fileset dir="." includes="ant/build.xml"/>
</subant>
+
<taskdef name="getversionpref" classname="org.apache.hadoop.hive.ant.GetVersionPref"
classpath="${build.dir.hive}/anttasks/hive-anttasks-${version}.jar"/>
+
+ <taskdef name="vectorcodegen" classname="org.apache.hadoop.hive.ant.GenVectorCode"
+ classpath="${build.dir.hive}/anttasks/hive-anttasks-${version}.jar"/>
+
</target>
@@ -742,6 +753,7 @@
<packageset dir="ql/src/test"/>
<packageset dir="ql/src/gen/thrift/gen-javabean"/>
<packageset dir="${build.dir.hive}/ql/gen/antlr/gen-java"/>
+ <packageset dir="${build.dir.hive}/ql/gen/vector"/>
<packageset dir="shims/src/common/java"/>
<link href="${javadoc.link.java}"/>
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Tue Oct 1 03:12:23 2013
@@ -814,6 +814,9 @@ public class HiveConf extends Configurat
// Whether to show the unquoted partition names in query results.
HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false),
+ //Vectorization enabled
+ HIVE_VECTORIZATION_ENABLED("hive.vectorized.execution.enabled", false),
+
HIVE_TYPE_CHECK_ON_INSERT("hive.typecheck.on.insert", true),
;
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Tue Oct 1 03:12:23 2013
@@ -2011,6 +2011,15 @@
</property>
<property>
+ <name>hive.vectorized.execution.enabled</name>
+ <value>false</value>
+ <description>
+ This flag should be set to true to enable vectorized mode of query execution.
+ The default value is false.
+ </description>
+</property>
+
+<property>
<name>hive.metastore.schema.verification</name>
<value>false</value>
<description>
@@ -2022,5 +2031,4 @@
</description>
</property>
-
</configuration>
Modified: hive/trunk/ql/build.xml
URL: http://svn.apache.org/viewvc/hive/trunk/ql/build.xml?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/build.xml (original)
+++ hive/trunk/ql/build.xml Tue Oct 1 03:12:23 2013
@@ -190,7 +190,7 @@
<echo message="Project: ${ant.project.name}"/>
<javac
encoding="${build.encoding}"
- srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}"
+ srcdir="${src.dir}:${basedir}/src/gen/thrift/gen-javabean:${build.dir}/gen/antlr/gen-java:${protobuf.build.dir}:${build.dir}/gen/vector"
includes="**/*.java"
destdir="${build.classes}"
debug="${javac.debug}"
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java Tue Oct 1 03:12:23 2013
@@ -373,6 +373,12 @@ public class FetchOperator implements Se
job.set("mapred.input.dir", org.apache.hadoop.util.StringUtils.escapeString(currPath
.toString()));
+ // Fetch operator is not vectorized and as such turn vectorization flag off so that
+ // non-vectorized record reader is created below.
+ if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ HiveConf.setBoolVar(job, HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED, false);
+ }
+
PartitionDesc partDesc;
if (currTbl == null) {
partDesc = currPart;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Tue Oct 1 03:12:23 2013
@@ -90,7 +90,7 @@ public class FileSinkOperator extends Te
protected transient int maxPartitions;
protected transient ListBucketingCtx lbCtx;
protected transient boolean isSkewedStoredAsSubDirectories;
- private transient boolean statsCollectRawDataSize;
+ protected transient boolean statsCollectRawDataSize;
private transient boolean[] statsFromRecordWriter;
private transient boolean isCollectRWStats;
@@ -215,6 +215,10 @@ public class FileSinkOperator extends Te
}
}
}
+
+ public Stat getStat() {
+ return stat;
+ }
} // class FSPaths
private static final long serialVersionUID = 1L;
@@ -222,7 +226,7 @@ public class FileSinkOperator extends Te
protected transient Serializer serializer;
protected transient BytesWritable commonKey = new BytesWritable();
protected transient TableIdEnum tabIdEnum = null;
- private transient LongWritable row_count;
+ protected transient LongWritable row_count;
private transient boolean isNativeTable = true;
/**
@@ -231,17 +235,17 @@ public class FileSinkOperator extends Te
* each reducer can write 10 files - this way we effectively get 1000 files.
*/
private transient ExprNodeEvaluator[] partitionEval;
- private transient int totalFiles;
+ protected transient int totalFiles;
private transient int numFiles;
- private transient boolean multiFileSpray;
- private transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
+ protected transient boolean multiFileSpray;
+ protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>();
private transient ObjectInspector[] partitionObjectInspectors;
- private transient HivePartitioner<HiveKey, Object> prtner;
- private transient final HiveKey key = new HiveKey();
+ protected transient HivePartitioner<HiveKey, Object> prtner;
+ protected transient final HiveKey key = new HiveKey();
private transient Configuration hconf;
- private transient FSPaths fsp;
- private transient boolean bDynParts;
+ protected transient FSPaths fsp;
+ protected transient boolean bDynParts;
private transient SubStructObjectInspector subSetOI;
private transient int timeOut; // JT timeout in msec.
private transient long lastProgressReport = System.currentTimeMillis();
@@ -273,7 +277,7 @@ public class FileSinkOperator extends Te
Class<? extends Writable> outputClass;
String taskId;
- private boolean filesCreated = false;
+ protected boolean filesCreated = false;
private void initializeSpecPath() {
// For a query of the type:
@@ -427,7 +431,7 @@ public class FileSinkOperator extends Te
}
}
- private void createBucketFiles(FSPaths fsp) throws HiveException {
+ protected void createBucketFiles(FSPaths fsp) throws HiveException {
try {
int filesIdx = 0;
Set<Integer> seenBuckets = new HashSet<Integer>();
@@ -541,7 +545,7 @@ public class FileSinkOperator extends Te
*
* @return true if a new progress update is reported, false otherwise.
*/
- private boolean updateProgress() {
+ protected boolean updateProgress() {
if (reporter != null &&
(System.currentTimeMillis() - lastProgressReport) > timeOut) {
reporter.progress();
@@ -552,7 +556,7 @@ public class FileSinkOperator extends Te
}
}
- Writable recordValue;
+ protected Writable recordValue;
@Override
public void processOp(Object row, int tag) throws HiveException {
@@ -674,7 +678,7 @@ public class FileSinkOperator extends Te
* @return
* @throws HiveException
*/
- private FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
+ protected FSPaths lookupListBucketingPaths(String lbDirName) throws HiveException {
FSPaths fsp2 = valToPaths.get(lbDirName);
if (fsp2 == null) {
fsp2 = createNewPaths(lbDirName);
@@ -712,7 +716,7 @@ public class FileSinkOperator extends Te
* @param row row to process.
* @return directory name.
*/
- private String generateListBucketingDirName(Object row) {
+ protected String generateListBucketingDirName(Object row) {
if (!this.isSkewedStoredAsSubDirectories) {
return null;
}
@@ -753,7 +757,7 @@ public class FileSinkOperator extends Te
return lbDirName;
}
- private FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
+ protected FSPaths getDynOutPaths(List<String> row, String lbDirName) throws HiveException {
FSPaths fp;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Tue Oct 1 03:12:23 2013
@@ -46,13 +46,14 @@ public class FilterOperator extends Oper
FILTERED, PASSED
}
- private final transient LongWritable filtered_count, passed_count;
+ protected final transient LongWritable filtered_count;
+ protected final transient LongWritable passed_count;
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
private transient int consecutiveFails;
private transient int consecutiveSearches;
private transient IOContext ioContext;
- transient int heartbeatInterval;
+ protected transient int heartbeatInterval;
public FilterOperator() {
super();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/GroupByOperator.java Tue Oct 1 03:12:23 2013
@@ -141,8 +141,16 @@ public class GroupByOperator extends Ope
transient StructObjectInspector newKeyObjectInspector;
transient StructObjectInspector currentKeyObjectInspector;
public static MemoryMXBean memoryMXBean;
- private long maxMemory;
- private float memoryThreshold;
+
+ /**
+ * Total amount of memory allowed for JVM heap.
+ */
+ protected long maxMemory;
+
+ /**
+ * configure percent of memory threshold usable by QP.
+ */
+ protected float memoryThreshold;
private boolean groupingSetsPresent;
private int groupingSetsPosition;
@@ -159,10 +167,18 @@ public class GroupByOperator extends Ope
transient List<Field>[] aggrPositions;
transient int fixedRowSize;
- transient long maxHashTblMemory;
+
+ /**
+ * Max memory usable by the hashtable before it should flush.
+ */
+ protected transient long maxHashTblMemory;
transient int totalVariableSize;
transient int numEntriesVarSize;
- transient int numEntriesHashTable;
+
+ /**
+ * Current number of entries in the hash table.
+ */
+ protected transient int numEntriesHashTable;
transient int countAfterReport; // report or forward
transient int heartbeatInterval;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/KeyWrapper.java Tue Oct 1 03:12:23 2013
@@ -22,9 +22,9 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
public abstract class KeyWrapper {
- abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
- abstract void setHashKey();
- abstract KeyWrapper copyKey();
- abstract void copyKey(KeyWrapper oldWrapper);
- abstract Object[] getKeyArray();
+ public abstract void getNewKey(Object row, ObjectInspector rowInspector) throws HiveException;
+ public abstract void setHashKey();
+ public abstract KeyWrapper copyKey();
+ public abstract void copyKey(KeyWrapper oldWrapper);
+ public abstract Object[] getKeyArray();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Tue Oct 1 03:12:23 2013
@@ -101,7 +101,7 @@ public abstract class Operator<T extends
protected transient State state = State.UNINIT;
- static transient boolean fatalError = false; // fatalError is shared acorss
+ protected static transient boolean fatalError = false; // fatalError is shared acorss
// all operators
static {
@@ -1438,6 +1438,60 @@ public abstract class Operator<T extends
return ret;
}
+ /**
+ * Clones only the operator. The children and parent are set
+ * to null.
+ * @return Cloned operator
+ * @throws CloneNotSupportedException
+ */
+ public Operator<? extends OperatorDesc> cloneOp() throws CloneNotSupportedException {
+ T descClone = (T) conf.clone();
+ Operator<? extends OperatorDesc> ret =
+ (Operator<? extends OperatorDesc>) OperatorFactory.getAndMakeChild(
+ descClone, getSchema());
+ return ret;
+ }
+
+ /**
+ * Recursively clones all the children of the tree,
+ * Fixes the pointers to children, parents and the pointers to itself coming from the children.
+ * It does not fix the pointers to itself coming from parents, parents continue to point to
+ * the original child.
+ * @return Cloned operator
+ * @throws CloneNotSupportedException
+ */
+ public Operator<? extends OperatorDesc> cloneRecursiveChildren()
+ throws CloneNotSupportedException {
+ Operator<? extends OperatorDesc> newOp = this.cloneOp();
+ newOp.setParentOperators(this.parentOperators);
+ // Fix parent in all children
+ if (this.getChildOperators() == null) {
+ newOp.setChildOperators(null);
+ return newOp;
+ }
+ List<Operator<? extends OperatorDesc>> newChildren =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+
+ for (Operator<? extends OperatorDesc> childOp : this.getChildOperators()) {
+ List<Operator<? extends OperatorDesc>> parentList =
+ new ArrayList<Operator<? extends OperatorDesc>>();
+ for (Operator<? extends OperatorDesc> parent : childOp.getParentOperators()) {
+ if (parent.equals(this)) {
+ parentList.add(newOp);
+ } else {
+ parentList.add(parent);
+ }
+ }
+ // Recursively clone the children
+ Operator<? extends OperatorDesc> clonedChildOp = childOp.cloneRecursiveChildren();
+ clonedChildOp.setParentOperators(parentList);
+ }
+
+ newOp.setChildOperators(newChildren);
+ return newOp;
+ }
+
+
/*
* True only for operators which produce atmost 1 output row per input
* row to it. This will allow the output column names to be directly
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/OperatorFactory.java Tue Oct 1 03:12:23 2013
@@ -22,12 +22,17 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFileSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorFilterOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorGroupByOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorSelectOperator;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizationContext;
import org.apache.hadoop.hive.ql.plan.CollectDesc;
-import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.DemuxDesc;
import org.apache.hadoop.hive.ql.plan.DummyStoreDesc;
-import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.ExtractDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.FilterDesc;
import org.apache.hadoop.hive.ql.plan.ForwardDesc;
@@ -39,6 +44,7 @@ import org.apache.hadoop.hive.ql.plan.La
import org.apache.hadoop.hive.ql.plan.LateralViewJoinDesc;
import org.apache.hadoop.hive.ql.plan.LimitDesc;
import org.apache.hadoop.hive.ql.plan.MapJoinDesc;
+import org.apache.hadoop.hive.ql.plan.MuxDesc;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PTFDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
@@ -106,6 +112,38 @@ public final class OperatorFactory {
MuxOperator.class));
}
+ public static ArrayList<OpTuple> vectorOpvec;
+ static {
+ vectorOpvec = new ArrayList<OpTuple>();
+ vectorOpvec.add(new OpTuple<SelectDesc>(SelectDesc.class, VectorSelectOperator.class));
+ vectorOpvec.add(new OpTuple<GroupByDesc>(GroupByDesc.class, VectorGroupByOperator.class));
+ vectorOpvec.add(new OpTuple<ReduceSinkDesc>(ReduceSinkDesc.class,
+ VectorReduceSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FileSinkDesc>(FileSinkDesc.class, VectorFileSinkOperator.class));
+ vectorOpvec.add(new OpTuple<FilterDesc>(FilterDesc.class, VectorFilterOperator.class));
+ }
+
+ public static <T extends OperatorDesc> Operator<T> getVectorOperator(T conf,
+ VectorizationContext vContext) {
+ Class<T> descClass = (Class<T>) conf.getClass();
+ for (OpTuple o : vectorOpvec) {
+ if (o.descClass == descClass) {
+ try {
+ Operator<T> op = (Operator<T>) o.opClass.getDeclaredConstructor(
+ VectorizationContext.class, OperatorDesc.class).newInstance(
+ vContext, conf);
+ op.initializeCounters();
+ return op;
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ throw new RuntimeException("No vector operator for descriptor class "
+ + descClass.getName());
+ }
+
public static <T extends OperatorDesc> Operator<T> get(Class<T> opClass) {
for (OpTuple o : opvec) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Tue Oct 1 03:12:23 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.UnionObjectInspector;
import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.Text;
/**
@@ -72,11 +73,11 @@ public class ReduceSinkOperator extends
// TODO: we use MetadataTypedColumnsetSerDe for now, till DynamicSerDe is
// ready
- transient Serializer keySerializer;
- transient boolean keyIsText;
- transient Serializer valueSerializer;
+ protected transient Serializer keySerializer;
+ protected transient boolean keyIsText;
+ protected transient Serializer valueSerializer;
transient int tag;
- transient byte[] tagByte = new byte[1];
+ protected transient byte[] tagByte = new byte[1];
transient protected int numDistributionKeys;
transient protected int numDistinctExprs;
transient String inputAlias; // input alias of this RS for join (used for PPD)
@@ -163,12 +164,15 @@ public class ReduceSinkOperator extends
}
transient InspectableObject tempInspectableObject = new InspectableObject();
- transient HiveKey keyWritable = new HiveKey();
+ protected transient HiveKey keyWritable = new HiveKey();
+ protected transient Writable value;
transient StructObjectInspector keyObjectInspector;
transient StructObjectInspector valueObjectInspector;
transient ObjectInspector[] partitionObjectInspectors;
+ protected transient Object[] cachedValues;
+ protected transient List<List<Integer>> distinctColIndices;
/**
* This two dimensional array holds key data and a corresponding Union object
* which contains the tag identifying the aggregate expression for distinct columns.
@@ -183,13 +187,9 @@ public class ReduceSinkOperator extends
* in this case, child GBY evaluates distict values with expression like KEY.col2:0.dist1
* see {@link ExprNodeColumnEvaluator}
*/
- transient Object[][] cachedKeys;
- transient Object[] cachedValues;
- transient List<List<Integer>> distinctColIndices;
-
+ protected transient Object[][] cachedKeys;
boolean firstRow;
-
- transient Random random;
+ protected transient Random random;
/**
* Initializes array of ExprNodeEvaluator. Adds Union field for distinct
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java Tue Oct 1 03:12:23 2013
@@ -569,7 +569,7 @@ public final class Utilities {
}
}
- private static Path getPlanPath(Configuration conf) {
+ public static Path getPlanPath(Configuration conf) {
String plan = HiveConf.getVar(conf, HiveConf.ConfVars.PLAN);
if (plan != null && !plan.isEmpty()) {
return new Path(plan);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecDriver.java Tue Oct 1 03:12:23 2013
@@ -237,6 +237,7 @@ public class ExecDriver extends Task<Map
ShimLoader.getHadoopShims().prepareJobOutput(job);
//See the javadoc on HiveOutputFormatImpl and HadoopShims.prepareJobOutput()
job.setOutputFormat(HiveOutputFormatImpl.class);
+
job.setMapperClass(ExecMapper.class);
job.setMapOutputKeyClass(HiveKey.class);
@@ -828,3 +829,4 @@ public class ExecDriver extends Task<Map
}
}
}
+
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/BitFieldReader.java Tue Oct 1 03:12:23 2013
@@ -20,8 +20,10 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
class BitFieldReader {
- private RunLengthByteReader input;
+ private final RunLengthByteReader input;
private final int bitSize;
private int current;
private int bitsLeft;
@@ -60,6 +62,30 @@ class BitFieldReader {
return result & mask;
}
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int types in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/DynamicByteArray.java Tue Oct 1 03:12:23 2013
@@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
-import org.apache.hadoop.io.Text;
-
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import org.apache.hadoop.io.Text;
+
/**
* A class that is a growable array of bytes. Growth is managed in terms of
* chunks that are allocated when needed.
@@ -237,6 +237,7 @@ final class DynamicByteArray {
}
}
+ @Override
public String toString() {
int i;
StringBuilder sb = new StringBuilder(length * 3);
@@ -268,10 +269,35 @@ final class DynamicByteArray {
}
/**
+ * Gets all the bytes of the array.
+ *
+ * @return Bytes of the array
+ */
+ public byte[] get() {
+ byte[] result = null;
+ if (length > 0) {
+ int currentChunk = 0;
+ int currentOffset = 0;
+ int currentLength = Math.min(length, chunkSize);
+ int destOffset = 0;
+ result = new byte[length];
+ int totalLength = length;
+ while (totalLength > 0) {
+ System.arraycopy(data[currentChunk], currentOffset, result, destOffset, currentLength);
+ destOffset += currentLength;
+ totalLength -= currentLength;
+ currentChunk += 1;
+ currentOffset = 0;
+ currentLength = Math.min(totalLength, chunkSize - currentOffset);
+ }
+ }
+ return result;
+ }
+
+ /**
* Get the size of the buffers.
*/
public long getSizeInBytes() {
return initializedChunks * chunkSize;
}
}
-
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/IntegerReader.java Tue Oct 1 03:12:23 2013
@@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.io.orc;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
/**
* Interface for reading integers.
@@ -52,4 +53,12 @@ interface IntegerReader {
* @throws IOException
*/
long next() throws IOException;
+
+ /**
+ * Return the next available vector for values.
+ * @return
+ * @throws IOException
+ */
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java Tue Oct 1 03:12:23 2013
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -38,6 +37,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.InputFormatChecker;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
@@ -54,12 +55,13 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.StringUtils;
-
/**
* A MapReduce/Hive input format for ORC files.
*/
-public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
- InputFormatChecker {
+public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>,
+ InputFormatChecker, VectorizedInputFormatInterface {
+
+ VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
private static final Log LOG = LogFactory.getLog(OrcInputFormat.class);
static final String MIN_SPLIT_SIZE = "mapred.min.split.size";
@@ -85,6 +87,7 @@ public class OrcInputFormat implements I
private final int numColumns;
private float progress = 0.0f;
+
OrcRecordReader(Reader file, Configuration conf,
long offset, long length) throws IOException {
String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR);
@@ -219,6 +222,12 @@ public class OrcInputFormat implements I
public RecordReader<NullWritable, OrcStruct>
getRecordReader(InputSplit inputSplit, JobConf conf,
Reporter reporter) throws IOException {
+ if (isVectorMode(conf)) {
+ RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(inputSplit, conf,
+ reporter);
+ return (RecordReader) vorr;
+ }
+
FileSplit fileSplit = (FileSplit) inputSplit;
Path path = fileSplit.getPath();
FileSystem fs = path.getFileSystem(conf);
@@ -231,6 +240,11 @@ public class OrcInputFormat implements I
public boolean validateInput(FileSystem fs, HiveConf conf,
ArrayList<FileStatus> files
) throws IOException {
+
+ if (isVectorMode(conf)) {
+ return voif.validateInput(fs, conf, files);
+ }
+
if (files.size() <= 0) {
return false;
}
@@ -244,6 +258,14 @@ public class OrcInputFormat implements I
return true;
}
+ private boolean isVectorMode(Configuration conf) {
+ if (Utilities.getPlanPath(conf) != null && Utilities
+ .getMapRedWork(conf).getMapWork().getVectorMode()) {
+ return true;
+ }
+ return false;
+ }
+
/**
* Get the list of input {@link Path}s for the map-reduce job.
*
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Tue Oct 1 03:12:23 2013
@@ -17,9 +17,16 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -29,26 +36,22 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
/**
* A serde class for ORC.
* It transparently passes the object to/from the ORC file reader/writer.
*/
-public class OrcSerde implements SerDe {
+public class OrcSerde implements SerDe, VectorizedSerde {
private static final Log LOG = LogFactory.getLog(OrcSerde.class);
private final OrcSerdeRow row = new OrcSerdeRow();
private ObjectInspector inspector = null;
+ private VectorizedOrcSerde vos = null;
+
final class OrcSerdeRow implements Writable {
- private Object realRow;
- private ObjectInspector inspector;
+ Object realRow;
+ ObjectInspector inspector;
@Override
public void write(DataOutput dataOutput) throws IOException {
@@ -79,7 +82,7 @@ public class OrcSerde implements SerDe {
// Parse the configuration parameters
ArrayList<String> columnNames = new ArrayList<String>();
if (columnNameProperty != null && columnNameProperty.length() > 0) {
- for(String name: columnNameProperty.split(",")) {
+ for (String name : columnNameProperty.split(",")) {
columnNames.add(name);
}
}
@@ -96,7 +99,7 @@ public class OrcSerde implements SerDe {
}
ArrayList<TypeInfo> fieldTypes =
- TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
StructTypeInfo rootType = new StructTypeInfo();
rootType.setAllStructFieldNames(columnNames);
rootType.setAllStructFieldTypeInfos(fieldTypes);
@@ -128,6 +131,7 @@ public class OrcSerde implements SerDe {
/**
* Always returns null, since serialized size doesn't make sense in the
* context of ORC files.
+ *
* @return null
*/
@Override
@@ -135,4 +139,18 @@ public class OrcSerde implements SerDe {
return null;
}
+ @Override
+ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+ throws SerDeException {
+ if (vos == null) {
+ vos = new VectorizedOrcSerde(objInspector);
+ }
+ return vos.serialize(vrg, objInspector);
+ }
+
+ @Override
+ public void deserializeVector(Object rowBlob, int rowsInBatch, VectorizedRowBatch reuseBatch)
+ throws SerDeException {
+ // nothing to do here
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReader.java Tue Oct 1 03:12:23 2013
@@ -19,6 +19,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+
/**
* A row-by-row iterator for ORC files.
*/
@@ -39,6 +41,16 @@ public interface RecordReader {
Object next(Object previous) throws IOException;
/**
+ * Read the next row batch. The size of the batch to read cannot be controlled
+ * by the callers. Caller need to look at VectorizedRowBatch.size of the retunred
+ * object to know the batch size read.
+ * @param previousBatch a row batch object that can be reused by the reader
+ * @return the row batch that was read
+ * @throws java.io.IOException
+ */
+ VectorizedRowBatch nextBatch(VectorizedRowBatch previousBatch) throws IOException;
+
+ /**
* Get the row number of the row that will be returned by the following
* call to next().
* @return the row number from 0 to the number of rows in the file
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java Tue Oct 1 03:12:23 2013
@@ -27,15 +27,17 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
-import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@ -234,6 +236,38 @@ class RecordReaderImpl implements Record
}
return previous;
}
+ /**
+ * Populates the isNull vector array in the previousVector object based on
+ * the present stream values. This function is called from all the child
+ * readers, and they all set the values based on isNull field value.
+ * @param previousVector The columnVector object whose isNull value is populated
+ * @param batchSize Size of the column vector
+ * @return
+ * @throws IOException
+ */
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+
+ ColumnVector result = (ColumnVector) previousVector;
+ if (present != null) {
+ // Set noNulls and isNull vector of the ColumnVector based on
+ // present stream
+ result.noNulls = true;
+ for (int i = 0; i < batchSize; i++) {
+ result.isNull[i] = (present.next() != 1);
+ if (result.noNulls && result.isNull[i]) {
+ result.noNulls = false;
+ }
+ }
+ } else {
+ // There is not present stream, this means that all the values are
+ // present.
+ result.noNulls = true;
+ for (int i = 0; i < batchSize; i++) {
+ result.isNull[i] = false;
+ }
+ }
+ return previousVector;
+ }
}
private static class BooleanTreeReader extends TreeReader{
@@ -277,6 +311,23 @@ class RecordReaderImpl implements Record
}
return result;
}
+
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
}
private static class ByteTreeReader extends TreeReader{
@@ -317,6 +368,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -370,6 +438,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -423,6 +508,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -476,6 +578,23 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ reader.nextVector(result, batchSize);
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -520,6 +639,39 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readFloat(stream);
+ } else {
+
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
@@ -568,6 +720,38 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ DoubleColumnVector result = null;
+ if (previousVector == null) {
+ result = new DoubleColumnVector();
+ } else {
+ result = (DoubleColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read value entries based on isNull entries
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ result.vector[i] = SerializationUtils.readDouble(stream);
+ } else {
+ // If the value is not present then set NaN
+ result.vector[i] = Double.NaN;
+ }
+ }
+
+ // Set isRepeating flag
+ result.isRepeating = true;
+ for (int i = 0; (i < batchSize - 1 && result.isRepeating); i++) {
+ if (result.vector[i] != result.vector[i + 1]) {
+ result.isRepeating = false;
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
stream.skip(items * 8);
@@ -636,6 +820,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextBatch is not supported operation for Binary type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
@@ -649,6 +839,7 @@ class RecordReaderImpl implements Record
private static class TimestampTreeReader extends TreeReader{
private IntegerReader data = null;
private IntegerReader nanos = null;
+ private final LongColumnVector nanoVector = new LongColumnVector();
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
@@ -708,6 +899,53 @@ class RecordReaderImpl implements Record
return result;
}
+ @Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ LongColumnVector result = null;
+ if (previousVector == null) {
+ result = new LongColumnVector();
+ } else {
+ result = (LongColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ data.nextVector(result, batchSize);
+ nanoVector.isNull = result.isNull;
+ nanos.nextVector(nanoVector, batchSize);
+
+ if(result.isRepeating && nanoVector.isRepeating) {
+ batchSize = 1;
+ }
+
+ // Non repeating values preset in the vector. Iterate thru the vector and populate the time
+ for (int i = 0; i < batchSize; i++) {
+ if (!result.isNull[i]) {
+ long ms = (result.vector[result.isRepeating ? 0 : i] + WriterImpl.BASE_TIMESTAMP)
+ * WriterImpl.MILLIS_PER_SECOND;
+ long ns = parseNanos(nanoVector.vector[nanoVector.isRepeating ? 0 : i]);
+ // the rounding error exists because java always rounds up when dividing integers
+ // -42001/1000 = -42; and -42001 % 1000 = -1 (+ 1000)
+ // to get the correct value we need
+ // (-42 - 1)*1000 + 999 = -42001
+ // (42)*1000 + 1 = 42001
+ if(ms < 0 && ns != 0) {
+ ms -= 1000;
+ }
+ // Convert millis into nanos and add the nano vector value to it
+ result.vector[i] = (ms * 1000000) + ns;
+ }
+ }
+
+ if(!(result.isRepeating && nanoVector.isRepeating)) {
+ // both have to repeat for the result to be repeating
+ result.isRepeating = false;
+ }
+
+ return result;
+ }
+
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) serialized >>> 3;
@@ -826,6 +1064,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Decimal type");
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; i++) {
@@ -885,6 +1129,11 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ return reader.nextVector(previousVector, batchSize);
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skipRows(items);
}
@@ -898,8 +1147,11 @@ class RecordReaderImpl implements Record
private InStream stream;
private IntegerReader lengths;
+ private final LongColumnVector scratchlcv;
+
StringDirectTreeReader(Path path, int columnId) {
super(path, columnId);
+ scratchlcv = new LongColumnVector();
}
@Override
@@ -958,6 +1210,72 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ BytesColumnVector result = null;
+ if (previousVector == null) {
+ result = new BytesColumnVector();
+ } else {
+ result = (BytesColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ // Read lengths
+ scratchlcv.isNull = result.isNull;
+ lengths.nextVector(scratchlcv, batchSize);
+ int totalLength = 0;
+ if (!scratchlcv.isRepeating) {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ totalLength += (int) scratchlcv.vector[i];
+ }
+ }
+ } else {
+ if (!scratchlcv.isNull[0]) {
+ totalLength = (int) (batchSize * scratchlcv.vector[0]);
+ }
+ }
+
+ //Read all the strings for this batch
+ byte[] allBytes = new byte[totalLength];
+ int offset = 0;
+ int len = totalLength;
+ while (len > 0) {
+ int bytesRead = stream.read(allBytes, offset, len);
+ if (bytesRead < 0) {
+ throw new EOFException("Can't finish byte read from " + stream);
+ }
+ len -= bytesRead;
+ offset += bytesRead;
+ }
+
+ // Too expensive to figure out 'repeating' by comparisons.
+ result.isRepeating = false;
+ offset = 0;
+ if (!scratchlcv.isRepeating) {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
+ offset += scratchlcv.vector[i];
+ } else {
+ result.setRef(i, allBytes, 0, 0);
+ }
+ }
+ } else {
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
+ offset += scratchlcv.vector[0];
+ } else {
+ result.setRef(i, allBytes, 0, 0);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
@@ -977,8 +1295,12 @@ class RecordReaderImpl implements Record
private int[] dictionaryOffsets;
private IntegerReader reader;
+ private byte[] dictionaryBufferInBytesCache = null;
+ private final LongColumnVector scratchlcv;
+
StringDictionaryTreeReader(Path path, int columnId) {
super(path, columnId);
+ scratchlcv = new LongColumnVector();
}
@Override
@@ -1004,6 +1326,8 @@ class RecordReaderImpl implements Record
if (in.available() > 0) {
dictionaryBuffer = new DynamicByteArray(64, in.available());
dictionaryBuffer.readAll(in);
+ // Since its start of strip invalidate the cache.
+ dictionaryBufferInBytesCache = null;
} else {
dictionaryBuffer = null;
}
@@ -1050,14 +1374,7 @@ class RecordReaderImpl implements Record
result = (Text) previous;
}
int offset = dictionaryOffsets[entry];
- int length;
- // if it isn't the last entry, subtract the offsets otherwise use
- // the buffer length.
- if (entry < dictionaryOffsets.length - 1) {
- length = dictionaryOffsets[entry + 1] - offset;
- } else {
- length = dictionaryBuffer.size() - offset;
- }
+ int length = getDictionaryEntryLength(entry, offset);
// If the column is just empty strings, the size will be zero,
// so the buffer will be null, in that case just return result
// as it will default to empty
@@ -1071,6 +1388,74 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ BytesColumnVector result = null;
+ int offset = 0, length = 0;
+ if (previousVector == null) {
+ result = new BytesColumnVector();
+ } else {
+ result = (BytesColumnVector) previousVector;
+ }
+
+ // Read present/isNull stream
+ super.nextVector(result, batchSize);
+
+ if (dictionaryBuffer != null) {
+
+ // Load dictionaryBuffer into cache.
+ if (dictionaryBufferInBytesCache == null) {
+ dictionaryBufferInBytesCache = dictionaryBuffer.get();
+ }
+
+ // Read string offsets
+ scratchlcv.isNull = result.isNull;
+ reader.nextVector(scratchlcv, batchSize);
+ if (!scratchlcv.isRepeating) {
+
+ // The vector has non-repeating strings. Iterate thru the batch
+ // and set strings one by one
+ for (int i = 0; i < batchSize; i++) {
+ if (!scratchlcv.isNull[i]) {
+ offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
+ result.setRef(i, dictionaryBufferInBytesCache, offset, length);
+ } else {
+ // If the value is null then set offset and length to zero (null string)
+ result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
+ }
+ }
+ } else {
+ // If the value is repeating then just set the first value in the
+ // vector and set the isRepeating flag to true. No need to iterate thru and
+ // set all the elements to the same value
+ offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
+ length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
+ result.setRef(0, dictionaryBufferInBytesCache, offset, length);
+ }
+ result.isRepeating = scratchlcv.isRepeating;
+ } else {
+ // Entire stripe contains null strings.
+ result.isRepeating = true;
+ result.noNulls = false;
+ result.isNull[0] = true;
+ result.setRef(0, "".getBytes(), 0, 0);
+ }
+ return result;
+ }
+
+ int getDictionaryEntryLength(int entry, int offset) {
+ int length = 0;
+ // if it isn't the last entry, subtract the offsets otherwise use
+ // the buffer length.
+ if (entry < dictionaryOffsets.length - 1) {
+ length = dictionaryOffsets[entry + 1] - offset;
+ } else {
+ length = dictionaryBuffer.size() - offset;
+ }
+ return length;
+ }
+
+ @Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@@ -1162,6 +1547,28 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ ColumnVector[] result = null;
+ if (previousVector == null) {
+ result = new ColumnVector[fields.length];
+ } else {
+ result = (ColumnVector[]) previousVector;
+ }
+
+ // Read all the members of struct as column vectors
+ for (int i = 0; i < fields.length; i++) {
+ if (fields[i] != null) {
+ if (result[i] == null) {
+ result[i] = (ColumnVector) fields[i].nextVector(null, batchSize);
+ } else {
+ fields[i].nextVector(result[i], batchSize);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -1231,6 +1638,12 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previousVector, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Union type");
+ }
+
+ @Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
@@ -1308,6 +1721,11 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for List type");
+ }
+
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
@@ -1396,6 +1814,11 @@ class RecordReaderImpl implements Record
}
@Override
+ Object nextVector(Object previous, long batchSize) throws IOException {
+ throw new UnsupportedOperationException(
+ "NextVector is not supported operation for Map type");
+ }
+
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
@@ -2196,6 +2619,31 @@ class RecordReaderImpl implements Record
}
@Override
+ public VectorizedRowBatch nextBatch(VectorizedRowBatch previous) throws IOException {
+ VectorizedRowBatch result = null;
+ if (rowInStripe >= rowCountInStripe) {
+ currentStripe += 1;
+ readStripe();
+ }
+
+ long batchSize = Math.min(VectorizedRowBatch.DEFAULT_SIZE, (rowCountInStripe - rowInStripe));
+ rowInStripe += batchSize;
+ if (previous == null) {
+ ColumnVector[] cols = (ColumnVector[]) reader.nextVector(null, (int) batchSize);
+ result = new VectorizedRowBatch(cols.length);
+ result.cols = cols;
+ } else {
+ result = (VectorizedRowBatch) previous;
+ result.selectedInUse = false;
+ reader.nextVector(result.cols, (int) batchSize);
+ }
+
+ result.size = (int) batchSize;
+ advanceToNextRow(rowInStripe + rowBaseInStripe);
+ return result;
+ }
+
+ @Override
public void close() throws IOException {
file.close();
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthByteReader.java Tue Oct 1 03:12:23 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
/**
* A reader that reads a sequence of bytes. A control byte is read before
* each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the
@@ -82,6 +84,29 @@ class RunLengthByteReader {
return result;
}
+ void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int types in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && ((previous.vector[i - 1] != previous.vector[i]) || (previous.isNull[i - 1] != previous.isNull[i]))) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReader.java Tue Oct 1 03:12:23 2013
@@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+
/**
* A reader that reads a sequence of integers.
* */
@@ -91,6 +93,30 @@ class RunLengthIntegerReader implements
}
@Override
+ public void nextVector(LongColumnVector previous, long previousLen)
+ throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (previous.vector[i - 1] != previous.vector[i] || previous.isNull[i - 1] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
+
+ @Override
public void seek(PositionProvider index) throws IOException {
input.seek(index);
int consumed = (int) index.getNext();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RunLengthIntegerReaderV2.java Tue Oct 1 03:12:23 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import java.io.EOFException;
import java.io.IOException;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.io.orc.RunLengthIntegerWriterV2.EncodingType;
/**
@@ -322,4 +323,28 @@ class RunLengthIntegerReaderV2 implement
numValues -= consume;
}
}
+
+ @Override
+ public void nextVector(LongColumnVector previous, long previousLen) throws IOException {
+ previous.isRepeating = true;
+ for (int i = 0; i < previousLen; i++) {
+ if (!previous.isNull[i]) {
+ previous.vector[i] = next();
+ } else {
+ // The default value of null for int type in vectorized
+ // processing is 1, so set that if the value is null
+ previous.vector[i] = 1;
+ }
+
+ // The default value for nulls in Vectorization for int types is 1
+ // and given that non null value can also be 1, we need to check for isNull also
+ // when determining the isRepeating flag.
+ if (previous.isRepeating
+ && i > 0
+ && (previous.vector[i - 1] != previous.vector[i] ||
+ previous.isNull[i - 1] != previous.isNull[i])) {
+ previous.isRepeating = false;
+ }
+ }
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/PhysicalOptimizer.java Tue Oct 1 03:12:23 2013
@@ -77,6 +77,13 @@ public class PhysicalOptimizer {
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_INFER_BUCKET_SORT)) {
resolvers.add(new BucketingSortingInferenceOptimizer());
}
+
+ // Vectorization should be the last optimization, because it doesn't modify the plan
+ // or any operators. It makes a very low level transformation to the expressions to
+ // run in the vectorized mode.
+ if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED)) {
+ resolvers.add(new Vectorizer());
+ }
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/TypeCheckProcFactory.java Tue Oct 1 03:12:23 2013
@@ -903,7 +903,7 @@ public final class TypeCheckProcFactory
}
}
- desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, children);
+ desc = ExprNodeGenericFuncDesc.newInstance(genericUDF, funcText, children);
}
// UDFOPPositive is a no-op.
// However, we still create it, and then remove it here, to make sure we
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AbstractOperatorDesc.java Tue Oct 1 03:12:23 2013
@@ -19,8 +19,20 @@
package org.apache.hadoop.hive.ql.plan;
public class AbstractOperatorDesc implements OperatorDesc {
+
+ private boolean vectorMode = false;
+
@Override
public Object clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException("clone not supported");
}
+
+ @Explain(displayName = "Vectorized execution", displayOnlyOnTrue = true)
+ public boolean getVectorModeOn() {
+ return vectorMode;
+ }
+
+ public void setVectorMode(boolean vm) {
+ this.vectorMode = vm;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/ExprNodeGenericFuncDesc.java Tue Oct 1 03:12:23 2013
@@ -62,6 +62,7 @@ public class ExprNodeGenericFuncDesc ext
*/
private GenericUDF genericUDF;
private List<ExprNodeDesc> childExprs;
+ private transient String funcText;
/**
* This class uses a writableObjectInspector rather than a TypeInfo to store
* the canonical type information for this NodeDesc.
@@ -73,13 +74,19 @@ public class ExprNodeGenericFuncDesc ext
public ExprNodeGenericFuncDesc() {
}
+ /* If the function has an explicit name like func(args) then call a
+ * constructor that explicitly provides the function name in the
+ * funcText argument.
+ */
public ExprNodeGenericFuncDesc(TypeInfo typeInfo, GenericUDF genericUDF,
+ String funcText,
List<ExprNodeDesc> children) {
this(TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo),
- genericUDF, children);
+ genericUDF, funcText, children);
}
public ExprNodeGenericFuncDesc(ObjectInspector oi, GenericUDF genericUDF,
+ String funcText,
List<ExprNodeDesc> children) {
super(TypeInfoUtils.getTypeInfoFromObjectInspector(oi));
this.writableObjectInspector =
@@ -87,6 +94,18 @@ public class ExprNodeGenericFuncDesc ext
assert (genericUDF != null);
this.genericUDF = genericUDF;
this.childExprs = children;
+ this.funcText = funcText;
+ }
+
+ // Backward-compatibility interfaces for functions without a user-visible name.
+ public ExprNodeGenericFuncDesc(TypeInfo typeInfo, GenericUDF genericUDF,
+ List<ExprNodeDesc> children) {
+ this(typeInfo, genericUDF, null, children);
+ }
+
+ public ExprNodeGenericFuncDesc(ObjectInspector oi, GenericUDF genericUDF,
+ List<ExprNodeDesc> children) {
+ this(oi, genericUDF, null, children);
}
@Override
@@ -165,17 +184,20 @@ public class ExprNodeGenericFuncDesc ext
cloneCh.add(ch.clone());
}
ExprNodeGenericFuncDesc clone = new ExprNodeGenericFuncDesc(typeInfo,
- FunctionRegistry.cloneGenericUDF(genericUDF), cloneCh);
+ FunctionRegistry.cloneGenericUDF(genericUDF), funcText, cloneCh);
return clone;
}
/**
- * Create a exprNodeGenericFuncDesc based on the genericUDFClass and the
- * children parameters.
+ * Create a ExprNodeGenericFuncDesc based on the genericUDFClass and the
+ * children parameters. If the function has an explicit name, the
+ * newInstance method should be passed the function name in the funcText
+ * argument.
*
* @throws UDFArgumentException
*/
public static ExprNodeGenericFuncDesc newInstance(GenericUDF genericUDF,
+ String funcText,
List<ExprNodeDesc> children) throws UDFArgumentException {
ObjectInspector[] childrenOIs = new ObjectInspector[children.size()];
for (int i = 0; i < childrenOIs.length; i++) {
@@ -232,7 +254,15 @@ public class ExprNodeGenericFuncDesc ext
}
}
- return new ExprNodeGenericFuncDesc(oi, genericUDF, children);
+ return new ExprNodeGenericFuncDesc(oi, genericUDF, funcText, children);
+ }
+
+ /* Backward-compatibility interface for the case where there is no explicit
+ * name for the function.
+ */
+ public static ExprNodeGenericFuncDesc newInstance(GenericUDF genericUDF,
+ List<ExprNodeDesc> children) throws UDFArgumentException {
+ return newInstance(genericUDF, null, children);
}
@Override
@@ -285,4 +315,8 @@ public class ExprNodeGenericFuncDesc ext
public void setSortedExpr(boolean isSortedExpr) {
this.isSortedExpr = isSortedExpr;
}
+
+ public String getFuncText() {
+ return this.funcText;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapWork.java Tue Oct 1 03:12:23 2013
@@ -112,6 +112,9 @@ public class MapWork extends BaseWork {
private boolean useBucketizedHiveInputFormat;
+ private Map<String, Map<Integer, String>> scratchColumnVectorTypes = null;
+ private boolean vectorMode = false;
+
public MapWork() {
}
@@ -479,4 +482,21 @@ public class MapWork extends BaseWork {
PlanUtils.configureJobConf(fs.getConf().getTableInfo(), job);
}
}
+
+ public Map<String, Map<Integer, String>> getScratchColumnVectorTypes() {
+ return scratchColumnVectorTypes;
+ }
+
+ public void setScratchColumnVectorTypes(
+ Map<String, Map<Integer, String>> scratchColumnVectorTypes) {
+ this.scratchColumnVectorTypes = scratchColumnVectorTypes;
+ }
+
+ public boolean getVectorMode() {
+ return vectorMode;
+ }
+
+ public void setVectorMode(boolean vectorMode) {
+ this.vectorMode = vectorMode;
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFHex.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFHex.java?rev=1527858&r1=1527857&r2=1527858&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFHex.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/udf/UDFHex.java Tue Oct 1 03:12:23 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.udf;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDF;
+import org.apache.hadoop.hive.ql.exec.vector.expressions.StringUnaryUDF.IUDFUnaryString;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
@@ -39,7 +40,7 @@ import org.apache.hadoop.io.Text;
+ " 'H1'\n"
+ " > SELECT _FUNC_('Facebook') FROM src LIMIT 1;\n"
+ " '46616365626F6F6B'")
-public class UDFHex extends UDF {
+public class UDFHex extends UDF implements IUDFUnaryString {
private final Text result = new Text();
private byte[] value = new byte[16];