You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/30 22:26:03 UTC
svn commit: r1477812 - in
/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql:
exec/vector/VectorizedSerde.java io/orc/CommonOrcInputFormat.java
io/orc/OrcSerde.java io/orc/VectorizedOrcInputFormat.java
io/orc/VectorizedOrcSerde.java
Author: hashutosh
Date: Tue Apr 30 20:26:03 2013
New Revision: 1477812
URL: http://svn.apache.org/r1477812
Log:
HIVE-4453 : Input format to read vector data from ORC (Jitendra Nath Pandey via Ashutosh Chauhan)
Added:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
Modified:
hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+public interface VectorizedSerde {
+
+ Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+ throws SerDeException;
+
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+
+public class CommonOrcInputFormat extends FileInputFormat<NullWritable, Writable>
+ implements InputFormatChecker {
+
+ OrcInputFormat oif = new OrcInputFormat();
+ VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
+
+ private static class CommonOrcRecordReader
+ implements RecordReader<NullWritable, Writable> {
+
+ final RecordReader<NullWritable, VectorizedRowBatch> vorr;
+ final RecordReader<NullWritable, OrcStruct> orr;
+
+ public CommonOrcRecordReader(RecordReader<NullWritable, VectorizedRowBatch> vorr,
+ RecordReader<NullWritable, OrcStruct> orr) {
+ this.vorr = vorr;
+ this.orr = orr;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (vorr != null) {
+ vorr.close();
+ } else {
+ orr.close();
+ }
+
+ }
+
+ @Override
+ public NullWritable createKey() {
+ if (vorr != null) {
+ return vorr.createKey();
+ } else {
+ return orr.createKey();
+ }
+ }
+
+ @Override
+ public Writable createValue() {
+ if (vorr != null) {
+ return vorr.createValue();
+ } else {
+ return orr.createValue();
+ }
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ if (vorr != null) {
+ return vorr.getPos();
+ } else {
+ return orr.getPos();
+ }
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ if (vorr != null) {
+ return vorr.getProgress();
+ } else {
+ return orr.getProgress();
+ }
+ }
+
+ @Override
+ public boolean next(NullWritable arg0, Writable arg1) throws IOException {
+ if (vorr != null) {
+ return vorr.next(arg0, (VectorizedRowBatch) arg1);
+ } else {
+ return orr.next(arg0, (OrcStruct) arg1);
+ }
+ }
+
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files)
+ throws IOException {
+ boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
+ false);
+ if (vectorPath) {
+ return voif.validateInput(fs, conf, files);
+ } else {
+ return oif.validateInput(fs, conf, files);
+ }
+ }
+
+ @Override
+ public RecordReader<NullWritable, Writable> getRecordReader(InputSplit split, JobConf conf,
+ Reporter reporter) throws IOException {
+ boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
+ false);
+ if (vectorPath) {
+ RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(split, conf,
+ reporter);
+ return new CommonOrcRecordReader(vorr, null);
+ } else {
+ RecordReader<NullWritable, OrcStruct> orr = oif.getRecordReader(split, conf, reporter);
+ return new CommonOrcRecordReader(null, orr);
+ }
+ }
+}
Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1477812&r1=1477811&r2=1477812&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Tue Apr 30 20:26:03 2013
@@ -17,7 +17,15 @@
*/
package org.apache.hadoop.hive.ql.io.orc;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -27,23 +35,19 @@ import org.apache.hadoop.hive.serde2.typ
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.Writable;
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
/**
* A serde class for ORC.
* It transparently passes the object to/from the ORC file reader/writer.
*/
-public class OrcSerde implements SerDe {
+public class OrcSerde implements SerDe, VectorizedSerde {
private final OrcSerdeRow row = new OrcSerdeRow();
private ObjectInspector inspector = null;
+ private VectorizedOrcSerde vos = null;
+
final class OrcSerdeRow implements Writable {
- private Object realRow;
- private ObjectInspector inspector;
+ Object realRow;
+ ObjectInspector inspector;
@Override
public void write(DataOutput dataOutput) throws IOException {
@@ -129,4 +133,13 @@ public class OrcSerde implements SerDe {
public SerDeStats getSerDeStats() {
return null;
}
+
+ @Override
+ public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+ throws SerDeException {
+ if (vos == null) {
+ vos = new VectorizedOrcSerde();
+ }
+ return vos.serialize(vrg, objInspector);
+ }
}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A MapReduce/Hive input format for ORC files.
+ */
+public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
+ implements InputFormatChecker {
+
+ private static class VectorizedOrcRecordReader
+ implements RecordReader<NullWritable, VectorizedRowBatch> {
+ private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+ private final long offset;
+ private final long length;
+ private final int numColumns;
+ private float progress = 0.0f;
+ private final OrcStruct rowObj;
+ private final List<OrcProto.Type> types;
+
+ VectorizedOrcRecordReader(Reader file, Configuration conf,
+ long offset, long length) throws IOException {
+ this.reader = file.rows(offset, length,
+ findIncludedColumns(file.getTypes(), conf));
+ types = file.getTypes();
+ if (types.size() == 0) {
+ numColumns = 0;
+ } else {
+ numColumns = types.get(0).getSubtypesCount();
+ }
+ this.offset = offset;
+ this.length = length;
+ rowObj = new OrcStruct(numColumns);
+ }
+
+ @Override
+ public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+
+ if (!reader.hasNext()) {
+ return false;
+ }
+ reader.nextBatch(value);
+ progress = reader.getProgress();
+ return true;
+ }
+
+ @Override
+ public NullWritable createKey() {
+ return NullWritable.get();
+ }
+
+ @Override
+ public VectorizedRowBatch createValue() {
+ return new VectorizedRowBatch(numColumns,
+ VectorizedRowBatch.DEFAULT_SIZE);
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return offset + (long) (progress * length);
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ @Override
+ public float getProgress() throws IOException {
+ return progress;
+ }
+ }
+
+ public VectorizedOrcInputFormat() {
+ // just set a really small lower bound
+ setMinSplitSize(16 * 1024);
+ }
+
+ /**
+ * Recurse down into a type subtree turning on all of the sub-columns.
+ * @param types the types of the file
+ * @param result the global view of columns that should be included
+ * @param typeId the root of tree to enable
+ */
+ private static void includeColumnRecursive(List<OrcProto.Type> types,
+ boolean[] result,
+ int typeId) {
+ result[typeId] = true;
+ OrcProto.Type type = types.get(typeId);
+ int children = type.getSubtypesCount();
+ for(int i=0; i < children; ++i) {
+ includeColumnRecursive(types, result, type.getSubtypes(i));
+ }
+ }
+
+ /**
+ * Take the configuration and figure out which columns we need to include.
+ * @param types the types of the file
+ * @param conf the configuration
+ * @return true for each column that should be included
+ */
+ private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
+ Configuration conf) {
+ String includedStr =
+ conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+ if (includedStr == null || includedStr.trim().length() == 0) {
+ return null;
+ } else {
+ int numColumns = types.size();
+ boolean[] result = new boolean[numColumns];
+ result[0] = true;
+ OrcProto.Type root = types.get(0);
+ List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
+ for(int i=0; i < root.getSubtypesCount(); ++i) {
+ if (included.contains(i)) {
+ includeColumnRecursive(types, result, root.getSubtypes(i));
+ }
+ }
+ // if we are filtering at least one column, return the boolean array
+ for(boolean include: result) {
+ if (!include) {
+ return result;
+ }
+ }
+ return null;
+ }
+ }
+
+ @Override
+ public RecordReader<NullWritable, VectorizedRowBatch>
+ getRecordReader(InputSplit inputSplit, JobConf conf,
+ Reporter reporter) throws IOException {
+ FileSplit fileSplit = (FileSplit) inputSplit;
+ Path path = fileSplit.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ reporter.setStatus(fileSplit.toString());
+ return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf,
+ fileSplit.getStart(), fileSplit.getLength());
+ }
+
+ @Override
+ public boolean validateInput(FileSystem fs, HiveConf conf,
+ ArrayList<FileStatus> files
+ ) throws IOException {
+ if (files.size() <= 0) {
+ return false;
+ }
+ for (FileStatus file : files) {
+ try {
+ OrcFile.createReader(fs, file.getPath());
+ } catch (IOException e) {
+ return false;
+ }
+ }
+ return true;
+ }
+}
Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A serde class for ORC.
+ * It transparently passes the object to/from the ORC file reader/writer.
+ */
+public class VectorizedOrcSerde extends OrcSerde {
+ private final OrcStruct [] orcStructArray = new OrcStruct [VectorizedRowBatch.DEFAULT_SIZE];
+ private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE];
+ private final ObjectWritable ow = new ObjectWritable();
+ private final ObjectInspector inspector = null;
+
+ public VectorizedOrcSerde() {
+ super();
+ for (int i = 0; i < orcStructArray.length; i++) {
+ orcRowArray[i] = new OrcSerdeRow();
+ }
+ }
+
+
+ @Override
+ public Writable serialize(Object obj, ObjectInspector inspector) {
+ VectorizedRowBatch batch = (VectorizedRowBatch)obj;
+ for (int i = 0; i < batch.size; i++) {
+ OrcStruct ost = orcStructArray[i];
+ if (ost == null) {
+ ost = new OrcStruct(batch.numCols);
+ orcStructArray[i] = ost;
+ }
+ int index = 0;
+ if (batch.selectedInUse) {
+ index = batch.selected[i];
+ } else {
+ index = i;
+ }
+ for (int k = 0; k < batch.numCols; k++) {
+ Writable w = batch.cols[k].getWritableObject(index);
+ ost.setFieldValue(k, w);
+ }
+ OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
+ row.realRow = ost;
+ row.inspector = inspector;
+ }
+ ow.set(orcRowArray);
+ return ow;
+ }
+}