You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by ha...@apache.org on 2013/04/30 22:26:03 UTC

svn commit: r1477812 - in /hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql: exec/vector/VectorizedSerde.java io/orc/CommonOrcInputFormat.java io/orc/OrcSerde.java io/orc/VectorizedOrcInputFormat.java io/orc/VectorizedOrcSerde.java

Author: hashutosh
Date: Tue Apr 30 20:26:03 2013
New Revision: 1477812

URL: http://svn.apache.org/r1477812
Log:
HIVE-4453 : Input format to read vector data from ORC (Jitendra Nath Pandey via Ashutosh Chauhan)

Added:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
Modified:
    hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedSerde.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.vector;
+
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.Writable;
+
+public interface VectorizedSerde {
+
+  Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+      throws SerDeException;
+
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/CommonOrcInputFormat.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+
+public class CommonOrcInputFormat extends FileInputFormat<NullWritable, Writable>
+    implements InputFormatChecker {
+
+  OrcInputFormat oif = new OrcInputFormat();
+  VectorizedOrcInputFormat voif = new VectorizedOrcInputFormat();
+
+  private static class CommonOrcRecordReader
+      implements RecordReader<NullWritable, Writable> {
+
+    final RecordReader<NullWritable, VectorizedRowBatch> vorr;
+    final RecordReader<NullWritable, OrcStruct> orr;
+
+    public CommonOrcRecordReader(RecordReader<NullWritable, VectorizedRowBatch> vorr,
+        RecordReader<NullWritable, OrcStruct> orr) {
+      this.vorr = vorr;
+      this.orr = orr;
+    }
+
+    @Override
+    public void close() throws IOException {
+      if (vorr != null) {
+        vorr.close();
+      } else {
+        orr.close();
+      }
+
+    }
+
+    @Override
+    public NullWritable createKey() {
+      if (vorr != null) {
+        return vorr.createKey();
+      } else {
+        return orr.createKey();
+      }
+    }
+
+    @Override
+    public Writable createValue() {
+      if (vorr != null) {
+        return vorr.createValue();
+      } else {
+        return orr.createValue();
+      }
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      if (vorr != null) {
+        return vorr.getPos();
+      } else {
+        return orr.getPos();
+      }
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      if (vorr != null) {
+        return vorr.getProgress();
+      } else {
+        return orr.getProgress();
+      }
+    }
+
+    @Override
+    public boolean next(NullWritable arg0, Writable arg1) throws IOException {
+      if (vorr != null) {
+        return vorr.next(arg0, (VectorizedRowBatch) arg1);
+      } else {
+        return orr.next(arg0, (OrcStruct) arg1);
+      }
+    }
+
+  }
+
+  @Override
+  public boolean validateInput(FileSystem fs, HiveConf conf, ArrayList<FileStatus> files)
+      throws IOException {
+    boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
+        false);
+    if (vectorPath) {
+      return voif.validateInput(fs, conf, files);
+    } else {
+      return oif.validateInput(fs, conf, files);
+    }
+  }
+
+  @Override
+  public RecordReader<NullWritable, Writable> getRecordReader(InputSplit split, JobConf conf,
+      Reporter reporter) throws IOException {
+    boolean vectorPath = conf.getBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.toString(),
+        false);
+    if (vectorPath) {
+      RecordReader<NullWritable, VectorizedRowBatch> vorr = voif.getRecordReader(split, conf,
+          reporter);
+      return new CommonOrcRecordReader(vorr, null);
+    } else {
+      RecordReader<NullWritable, OrcStruct> orr = oif.getRecordReader(split, conf, reporter);
+      return new CommonOrcRecordReader(null, orr);
+    }
+  }
+}

Modified: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java?rev=1477812&r1=1477811&r2=1477812&view=diff
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java (original)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSerde.java Tue Apr 30 20:26:03 2013
@@ -17,7 +17,15 @@
  */
 package org.apache.hadoop.hive.ql.io.orc;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Properties;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedSerde;
 import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeStats;
@@ -27,23 +35,19 @@ import org.apache.hadoop.hive.serde2.typ
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.io.Writable;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Properties;
-
 /**
  * A serde class for ORC.
  * It transparently passes the object to/from the ORC file reader/writer.
  */
-public class OrcSerde implements SerDe {
+public class OrcSerde implements SerDe, VectorizedSerde {
   private final OrcSerdeRow row = new OrcSerdeRow();
   private ObjectInspector inspector = null;
 
+  private VectorizedOrcSerde vos = null;
+
   final class OrcSerdeRow implements Writable {
-    private Object realRow;
-    private ObjectInspector inspector;
+    Object realRow;
+    ObjectInspector inspector;
 
     @Override
     public void write(DataOutput dataOutput) throws IOException {
@@ -129,4 +133,13 @@ public class OrcSerde implements SerDe {
   public SerDeStats getSerDeStats() {
     return null;
   }
+
+  @Override
+  public Writable serializeVector(VectorizedRowBatch vrg, ObjectInspector objInspector)
+      throws SerDeException {
+    if (vos == null) {
+      vos = new VectorizedOrcSerde();
+    }
+    return vos.serialize(vrg, objInspector);
+  }
 }

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcInputFormat.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.io.orc;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.ql.io.InputFormatChecker;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * A MapReduce/Hive input format for ORC files.
+ */
+public class VectorizedOrcInputFormat extends FileInputFormat<NullWritable, VectorizedRowBatch>
+  implements InputFormatChecker {
+
+  private static class VectorizedOrcRecordReader
+      implements RecordReader<NullWritable, VectorizedRowBatch> {
+    private final org.apache.hadoop.hive.ql.io.orc.RecordReader reader;
+    private final long offset;
+    private final long length;
+    private final int numColumns;
+    private float progress = 0.0f;
+    private final OrcStruct rowObj;
+    private final List<OrcProto.Type> types;
+
+    VectorizedOrcRecordReader(Reader file, Configuration conf,
+                    long offset, long length) throws IOException {
+      this.reader = file.rows(offset, length,
+          findIncludedColumns(file.getTypes(), conf));
+      types = file.getTypes();
+      if (types.size() == 0) {
+        numColumns = 0;
+      } else {
+        numColumns = types.get(0).getSubtypesCount();
+      }
+      this.offset = offset;
+      this.length = length;
+      rowObj = new OrcStruct(numColumns);
+    }
+
+    @Override
+    public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException {
+
+      if (!reader.hasNext()) {
+        return false;
+      }
+      reader.nextBatch(value);
+      progress = reader.getProgress();
+      return true;
+    }
+
+    @Override
+    public NullWritable createKey() {
+      return NullWritable.get();
+    }
+
+    @Override
+    public VectorizedRowBatch createValue() {
+      return new VectorizedRowBatch(numColumns,
+          VectorizedRowBatch.DEFAULT_SIZE);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return offset + (long) (progress * length);
+    }
+
+    @Override
+    public void close() throws IOException {
+      reader.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      return progress;
+    }
+  }
+
+  public VectorizedOrcInputFormat() {
+    // just set a really small lower bound
+    setMinSplitSize(16 * 1024);
+  }
+
+  /**
+   * Recurse down into a type subtree turning on all of the sub-columns.
+   * @param types the types of the file
+   * @param result the global view of columns that should be included
+   * @param typeId the root of tree to enable
+   */
+  private static void includeColumnRecursive(List<OrcProto.Type> types,
+                                             boolean[] result,
+                                             int typeId) {
+    result[typeId] = true;
+    OrcProto.Type type = types.get(typeId);
+    int children = type.getSubtypesCount();
+    for(int i=0; i < children; ++i) {
+      includeColumnRecursive(types, result, type.getSubtypes(i));
+    }
+  }
+
+  /**
+   * Take the configuration and figure out which columns we need to include.
+   * @param types the types of the file
+   * @param conf the configuration
+   * @return true for each column that should be included
+   */
+  private static boolean[] findIncludedColumns(List<OrcProto.Type> types,
+                                               Configuration conf) {
+    String includedStr =
+        conf.get(ColumnProjectionUtils.READ_COLUMN_IDS_CONF_STR);
+    if (includedStr == null || includedStr.trim().length() == 0) {
+      return null;
+    } else {
+      int numColumns = types.size();
+      boolean[] result = new boolean[numColumns];
+      result[0] = true;
+      OrcProto.Type root = types.get(0);
+      List<Integer> included = ColumnProjectionUtils.getReadColumnIDs(conf);
+      for(int i=0; i < root.getSubtypesCount(); ++i) {
+        if (included.contains(i)) {
+          includeColumnRecursive(types, result, root.getSubtypes(i));
+        }
+      }
+      // if we are filtering at least one column, return the boolean array
+      for(boolean include: result) {
+        if (!include) {
+          return result;
+        }
+      }
+      return null;
+    }
+  }
+
+  @Override
+  public RecordReader<NullWritable, VectorizedRowBatch>
+      getRecordReader(InputSplit inputSplit, JobConf conf,
+                      Reporter reporter) throws IOException {
+    FileSplit fileSplit = (FileSplit) inputSplit;
+    Path path = fileSplit.getPath();
+    FileSystem fs = path.getFileSystem(conf);
+    reporter.setStatus(fileSplit.toString());
+    return new VectorizedOrcRecordReader(OrcFile.createReader(fs, path), conf,
+                               fileSplit.getStart(), fileSplit.getLength());
+  }
+
+  @Override
+  public boolean validateInput(FileSystem fs, HiveConf conf,
+                               ArrayList<FileStatus> files
+                              ) throws IOException {
+    if (files.size() <= 0) {
+      return false;
+    }
+    for (FileStatus file : files) {
+      try {
+        OrcFile.createReader(fs, file.getPath());
+      } catch (IOException e) {
+        return false;
+      }
+    }
+    return true;
+  }
+}

Added: hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java
URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java?rev=1477812&view=auto
==============================================================================
--- hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java (added)
+++ hive/branches/vectorization/ql/src/java/org/apache/hadoop/hive/ql/io/orc/VectorizedOrcSerde.java Tue Apr 30 20:26:03 2013
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.io.orc;
+
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * A serde class for ORC.
+ * It transparently passes the object to/from the ORC file reader/writer.
+ */
+public class VectorizedOrcSerde extends OrcSerde {
+  private final OrcStruct [] orcStructArray = new OrcStruct [VectorizedRowBatch.DEFAULT_SIZE];
+  private final Writable [] orcRowArray = new Writable [VectorizedRowBatch.DEFAULT_SIZE];
+  private final ObjectWritable ow = new ObjectWritable();
+  private final ObjectInspector inspector = null;
+
+  public VectorizedOrcSerde() {
+    super();
+    for (int i = 0; i < orcStructArray.length; i++) {
+      orcRowArray[i] = new OrcSerdeRow();
+    }
+  }
+
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector inspector) {
+    VectorizedRowBatch batch = (VectorizedRowBatch)obj;
+    for (int i = 0; i < batch.size; i++) {
+      OrcStruct ost = orcStructArray[i];
+      if (ost == null) {
+        ost = new OrcStruct(batch.numCols);
+        orcStructArray[i] = ost;
+      }
+      int index = 0;
+      if (batch.selectedInUse) {
+        index = batch.selected[i];
+      } else {
+        index = i;
+      }
+      for (int k = 0; k < batch.numCols; k++) {
+        Writable w = batch.cols[k].getWritableObject(index);
+        ost.setFieldValue(k, w);
+      }
+      OrcSerdeRow row = (OrcSerdeRow) orcRowArray[i];
+      row.realRow = ost;
+      row.inspector = inspector;
+    }
+    ow.set(orcRowArray);
+    return ow;
+  }
+}