You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by je...@apache.org on 2019/03/21 23:10:48 UTC

[incubator-pinot] branch master updated: Adding ORC reader (#3994)

This is an automated email from the ASF dual-hosted git repository.

jenniferdai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8074d0a  Adding ORC reader (#3994)
8074d0a is described below

commit 8074d0af51b34a7485a9ae5c3de7ab5aa7a4c6a0
Author: Jennifer Dai <je...@users.noreply.github.com>
AuthorDate: Thu Mar 21 16:10:43 2019 -0700

    Adding ORC reader (#3994)
    
    * This commit adds ORC reader capabilities for Pinot excluding multivalue columns. In order to plug this into the Hadoop job, you can set record.reader.path in your hadoop job to the path of the ORCReader class.
    * Performance generating the same segment with ORC and Avro
    ORC: 3 mins 41 seconds
    Avro: 3 mins 35 seconds
---
 .../java/org/apache/pinot/common/data/Schema.java  |   2 +-
 pinot-distribution/pom.xml                         |   4 +
 pinot-orc/pom.xml                                  |  77 ++++++++
 .../pinot/orc/data/readers/ORCRecordReader.java    | 205 +++++++++++++++++++++
 .../orc/data/readers/ORCRecordReaderTest.java      | 116 ++++++++++++
 pom.xml                                            |  16 ++
 6 files changed, 419 insertions(+), 1 deletion(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
index f06b14c..2db6475 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/data/Schema.java
@@ -137,7 +137,7 @@ public final class Schema {
   public List<MetricFieldSpec> getMetricFieldSpecs() {
     return _metricFieldSpecs;
   }
-
+  
   /**
    * Required by JSON deserializer. DO NOT USE. DO NOT REMOVE.
    * Adding @Deprecated to prevent usage
diff --git a/pinot-distribution/pom.xml b/pinot-distribution/pom.xml
index b8d6177..c391cdc 100644
--- a/pinot-distribution/pom.xml
+++ b/pinot-distribution/pom.xml
@@ -78,6 +78,10 @@
     </dependency>
     <dependency>
       <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-orc</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
       <artifactId>pinot-transport</artifactId>
     </dependency>
     <dependency>
diff --git a/pinot-orc/pom.xml b/pinot-orc/pom.xml
new file mode 100644
index 0000000..0b7bb8c
--- /dev/null
+++ b/pinot-orc/pom.xml
@@ -0,0 +1,77 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+  <parent>
+    <artifactId>pinot</artifactId>
+    <groupId>org.apache.pinot</groupId>
+    <version>0.2.0-SNAPSHOT</version>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <artifactId>pinot-orc</artifactId>
+  <name>Pinot ORC</name>
+  <url>https://pinot.apache.org/</url>
+  <properties>
+    <pinot.root>${basedir}/..</pinot.root>
+  </properties>
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+      </plugin>
+    </plugins>
+  </build>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-core</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.orc</groupId>
+      <artifactId>orc-mapreduce</artifactId>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-yarn-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.pinot</groupId>
+      <artifactId>pinot-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.testng</groupId>
+      <artifactId>testng</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+</project>
\ No newline at end of file
diff --git a/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
new file mode 100644
index 0000000..0c1b2db
--- /dev/null
+++ b/pinot-orc/src/main/java/org/apache/pinot/orc/data/readers/ORCRecordReader.java
@@ -0,0 +1,205 @@
+package org.apache.pinot.orc.data.readers;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.ShortWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.mapred.OrcList;
+import org.apache.orc.mapred.OrcMapredRecordReader;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.data.readers.RecordReader;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The ORCRecordReader uses a VectorizedRowBatch, which we convert to a Writable. Then, we convert these
+ * Writable objects to primitives that we can then store in a GenericRow.
+ *
+ * When new data types are added to Pinot, we will need to update them here as well.
+ * Note that not all ORC types are supported; we only support the ORC types that correspond to either
+ * primitives or multivalue columns in Pinot, which is similar to other record readers.
+ */
+public class ORCRecordReader implements RecordReader {
+
+  private Schema _pinotSchema;
+  private TypeDescription _orcSchema;
+  Reader _reader;
+  org.apache.orc.RecordReader _recordReader;
+  VectorizedRowBatch _reusableVectorizedRowBatch;
+
+  public static final String LOCAL_FS_PREFIX = "file://";
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(ORCRecordReader.class);
+
+  private void init(String inputPath, Schema schema) {
+    Configuration conf = new Configuration();
+    LOGGER.info("Creating segment for {}", inputPath);
+    try {
+      Path orcReaderPath = new Path(LOCAL_FS_PREFIX + inputPath);
+      LOGGER.info("orc reader path is {}", orcReaderPath);
+      _reader = OrcFile.createReader(orcReaderPath, OrcFile.readerOptions(conf));
+      _orcSchema = _reader.getSchema();
+      LOGGER.info("ORC schema is {}", _orcSchema.toJson());
+
+      _pinotSchema = schema;
+      if (_pinotSchema == null) {
+        LOGGER.warn("Pinot schema is not set in segment generator config");
+      }
+      _recordReader = _reader.rows(_reader.options().schema(_orcSchema));
+    } catch (Exception e) {
+      LOGGER.error("Caught exception initializing record reader at path {}", inputPath);
+      throw new RuntimeException(e);
+    }
+
+    // Create a row batch with max size 1
+    _reusableVectorizedRowBatch = _orcSchema.createRowBatch(1);
+  }
+
+  @Override
+  public void init(SegmentGeneratorConfig segmentGeneratorConfig) {
+    init(segmentGeneratorConfig.getInputFilePath(), segmentGeneratorConfig.getSchema());
+  }
+
+  @Override
+  public boolean hasNext() {
+    try {
+      return _recordReader.getProgress() != 1;
+    } catch (IOException e) {
+      LOGGER.error("Could not get next record");
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public GenericRow next()
+      throws IOException {
+    return next(new GenericRow());
+  }
+
+  @Override
+  public GenericRow next(GenericRow reuse)
+      throws IOException {
+    _recordReader.nextBatch(_reusableVectorizedRowBatch);
+    fillGenericRow(reuse, _reusableVectorizedRowBatch);
+    return reuse;
+  }
+
+  private void fillGenericRow(GenericRow genericRow, VectorizedRowBatch rowBatch) {
+    // ORC's TypeDescription is the equivalent of a schema. The way we will support ORC in Pinot
+    // will be to get the top level struct that contains all our fields and look through its
+    // children to determine the fields in our schemas.
+    if (_orcSchema.getCategory().equals(TypeDescription.Category.STRUCT)) {
+      for (int i = 0; i < _orcSchema.getChildren().size(); i++) {
+        // Get current column in schema
+        TypeDescription currColumn = _orcSchema.getChildren().get(i);
+        String currColumnName = _orcSchema.getFieldNames().get(i);
+        if (!_pinotSchema.getColumnNames().contains(currColumnName)) {
+          LOGGER.warn("Skipping column {} because it is not in pinot schema", currColumnName);
+          continue;
+        }
+        // ORC will keep your columns in the same order as the schema provided
+        ColumnVector vector = rowBatch.cols[i];
+        // Previous value set to null, not used except to save allocation memory in OrcMapredRecordReader
+        WritableComparable writableComparable = OrcMapredRecordReader.nextValue(vector, 0, currColumn, null);
+        genericRow.putField(currColumnName, getBaseObject(writableComparable));
+      }
+    } else {
+      throw new IllegalArgumentException("Not a valid schema");
+    }
+  }
+
+  /**
+   * A utility method to convert an Orc WritableComparable object to a generic Java object that can
+   * be added to a Pinot GenericRow object
+   *
+   * @param w Orc WritableComparable to convert
+   * @return Object that will be added to the Pinot GenericRow
+   */
+  private Object getBaseObject(WritableComparable w) {
+    Object obj = null;
+
+    if (w == null || NullWritable.class.isAssignableFrom(w.getClass())) {
+      obj = null;
+    } else if (BooleanWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((BooleanWritable) w).get();
+    } else if (ByteWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((ByteWritable) w).get();
+    } else if (ShortWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((ShortWritable) w).get();
+    } else if (IntWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((IntWritable) w).get();
+    } else if (LongWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((LongWritable) w).get();
+    } else if (FloatWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((FloatWritable) w).get();
+    } else if (DoubleWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((DoubleWritable) w).get();
+    } else if (BytesWritable.class.isAssignableFrom(w.getClass())) {
+      obj = ((BytesWritable) w).getBytes();
+    } else if (Text.class.isAssignableFrom(w.getClass())) {
+      obj = ((Text) w).toString();
+    } else if (OrcList.class.isAssignableFrom(w.getClass())) {
+      // TODO: This is probably multivalue columns
+      LOGGER.info("Skipping unsupported type: list");
+    } else {
+      LOGGER.info("Unknown type found: " + w.getClass().getSimpleName());
+      throw new IllegalArgumentException("Unknown type: " + w.getClass().getSimpleName());
+    }
+
+    return obj;
+  }
+
+  @Override
+  public void rewind()
+      throws IOException {
+    _recordReader = _reader.rows();
+  }
+
+  @Override
+  public org.apache.pinot.common.data.Schema getSchema() {
+    return _pinotSchema;
+  }
+
+  @Override
+  public void close()
+      throws IOException {
+    _recordReader.close();
+  }
+}
diff --git a/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
new file mode 100644
index 0000000..c96e55d
--- /dev/null
+++ b/pinot-orc/src/test/java/org/apache/pinot/orc/data/readers/ORCRecordReaderTest.java
@@ -0,0 +1,116 @@
+package org.apache.pinot.orc.data.readers;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.Writer;
+import org.apache.pinot.common.data.DimensionFieldSpec;
+import org.apache.pinot.common.data.FieldSpec;
+import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.core.data.GenericRow;
+import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class ORCRecordReaderTest {
+  private static final File TEMP_DIR = FileUtils.getTempDirectory();
+  private static final File ORC_FILE = new File(TEMP_DIR.getAbsolutePath(), "my-file.orc");
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
+    TypeDescription schema =
+        TypeDescription.fromString("struct<x:int,y:string>");
+
+    Writer writer = OrcFile.createWriter(new Path(ORC_FILE.getAbsolutePath()),
+        OrcFile.writerOptions(new Configuration())
+            .setSchema(schema));
+
+    VectorizedRowBatch batch = schema.createRowBatch();
+    LongColumnVector x = (LongColumnVector) batch.cols[0];
+    BytesColumnVector y = (BytesColumnVector) batch.cols[1];
+    for(int r=0; r < 5; ++r) {
+      int row = batch.size++;
+      x.vector[row] = r;
+      byte[] buffer = ("Last-" + (r * 3)).getBytes(StandardCharsets.UTF_8);
+      y.setRef(row, buffer, 0, buffer.length);
+      // If the batch is full, write it out and start over.
+      if (batch.size == batch.getMaxSize()) {
+        writer.addRowBatch(batch);
+        batch.reset();
+      }
+    }
+    if (batch.size != 0) {
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+  }
+
+  @Test
+  public void testReadData()
+      throws IOException {
+
+    ORCRecordReader orcRecordReader = new ORCRecordReader();
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig();
+    segmentGeneratorConfig.setInputFilePath(ORC_FILE.getAbsolutePath());
+    Schema schema = new Schema();
+    FieldSpec xFieldSpec = new DimensionFieldSpec("x", FieldSpec.DataType.LONG, true);
+    schema.addField(xFieldSpec);
+    FieldSpec yFieldSpec = new DimensionFieldSpec("y", FieldSpec.DataType.BYTES, true);
+    schema.addField(yFieldSpec);
+    segmentGeneratorConfig.setSchema(schema);
+    orcRecordReader.init(segmentGeneratorConfig);
+
+    List<GenericRow> genericRows = new ArrayList<>();
+    while (orcRecordReader.hasNext()) {
+      genericRows.add(orcRecordReader.next());
+    }
+    orcRecordReader.close();
+    Assert.assertEquals(genericRows.size(), 5, "Generic row size must be 5");
+
+    for (int i = 0; i < genericRows.size(); i++) {
+      Assert.assertEquals(genericRows.get(i).getValue("x"), i);
+      Assert.assertEquals(genericRows.get(i).getValue("y"), "Last-" + (i * 3));
+    }
+  }
+
+  @AfterClass
+  public void tearDown() {
+    FileUtils.deleteQuietly(TEMP_DIR);
+  }
+
+}
diff --git a/pom.xml b/pom.xml
index ad9486a..0756f8b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@
     <module>pinot-filesystem</module>
     <module>pinot-hadoop-filesystem</module>
     <module>pinot-azure-filesystem</module>
+    <module>pinot-orc</module>
   </modules>
 
   <licenses>
@@ -287,6 +288,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.pinot</groupId>
+        <artifactId>pinot-orc</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.pinot</groupId>
         <artifactId>pinot-transport</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -592,6 +598,16 @@
         </exclusions>
       </dependency>
       <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-core</artifactId>
+        <version>1.5.2</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.orc</groupId>
+        <artifactId>orc-mapreduce</artifactId>
+        <version>1.5.2</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jackson</groupId>
         <artifactId>jackson-core-asl</artifactId>
         <version>1.9.13</version>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org