You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:10 UTC

[08/15] orc git commit: more updates

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
new file mode 100644
index 0000000..ae78cc4
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.csv;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.TaxiToOrc;
+
+public class CsvScan {
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    long rowCount = 0;
+    TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
+    for(String filename: args) {
+      CsvReader reader = new CsvReader(new Path(filename), conf, schema);
+      VectorizedRowBatch batch = schema.createRowBatch();
+      while (reader.nextBatch(batch)) {
+        rowCount += batch.size;
+      }
+    }
+    System.out.println("Rows read: " + rowCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
new file mode 100644
index 0000000..a5057e4
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.json;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonStreamParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+public class JsonReader {
+  private final TypeDescription schema;
+  private final JsonStreamParser parser;
+  private final JsonConverter[] converters;
+
+  interface JsonConverter {
+    void convert(JsonElement value, ColumnVector vect, int row);
+  }
+
+  static class BooleanColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        LongColumnVector vector = (LongColumnVector) vect;
+        vector.vector[row] = value.getAsBoolean() ? 1 : 0;
+      }
+    }
+  }
+
+  static class LongColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        LongColumnVector vector = (LongColumnVector) vect;
+        vector.vector[row] = value.getAsLong();
+      }
+    }
+  }
+
+  static class DoubleColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        DoubleColumnVector vector = (DoubleColumnVector) vect;
+        vector.vector[row] = value.getAsDouble();
+      }
+    }
+  }
+
+  static class StringColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        BytesColumnVector vector = (BytesColumnVector) vect;
+        byte[] bytes = value.getAsString().getBytes();
+        vector.setRef(row, bytes, 0, bytes.length);
+      }
+    }
+  }
+
+  static class BinaryColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        BytesColumnVector vector = (BytesColumnVector) vect;
+        String binStr = value.getAsString();
+        byte[] bytes = new byte[binStr.length()/2];
+        for(int i=0; i < bytes.length; ++i) {
+          bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16);
+        }
+        vector.setRef(row, bytes, 0, bytes.length);
+      }
+    }
+  }
+
+  static class TimestampColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        TimestampColumnVector vector = (TimestampColumnVector) vect;
+        vector.set(row, Timestamp.valueOf(value.getAsString()
+            .replaceAll("[TZ]", " ")));
+      }
+    }
+  }
+
+  static class DecimalColumnConverter implements JsonConverter {
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        DecimalColumnVector vector = (DecimalColumnVector) vect;
+        vector.vector[row].set(HiveDecimal.create(value.getAsString()));
+      }
+    }
+  }
+
+  static class StructColumnConverter implements JsonConverter {
+    private JsonConverter[] childrenConverters;
+    private List<String> fieldNames;
+
+    public StructColumnConverter(TypeDescription schema) {
+      List<TypeDescription> kids = schema.getChildren();
+      childrenConverters = new JsonConverter[kids.size()];
+      for(int c=0; c < childrenConverters.length; ++c) {
+        childrenConverters[c] = createConverter(kids.get(c));
+      }
+      fieldNames = schema.getFieldNames();
+    }
+
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        StructColumnVector vector = (StructColumnVector) vect;
+        JsonObject obj = value.getAsJsonObject();
+        for(int c=0; c < childrenConverters.length; ++c) {
+          JsonElement elem = obj.get(fieldNames.get(c));
+          childrenConverters[c].convert(elem, vector.fields[c], row);
+        }
+      }
+    }
+  }
+
+  static class ListColumnConverter implements JsonConverter {
+    private JsonConverter childrenConverter;
+
+    public ListColumnConverter(TypeDescription schema) {
+      childrenConverter = createConverter(schema.getChildren().get(0));
+    }
+
+    public void convert(JsonElement value, ColumnVector vect, int row) {
+      if (value == null || value.isJsonNull()) {
+        vect.noNulls = false;
+        vect.isNull[row] = true;
+      } else {
+        ListColumnVector vector = (ListColumnVector) vect;
+        JsonArray obj = value.getAsJsonArray();
+        vector.lengths[row] = obj.size();
+        vector.offsets[row] = vector.childCount;
+        vector.childCount += vector.lengths[row];
+        vector.child.ensureSize(vector.childCount, true);
+        for(int c=0; c < obj.size(); ++c) {
+          childrenConverter.convert(obj.get(c), vector.child,
+              (int) vector.offsets[row] + c);
+        }
+      }
+    }
+  }
+
+  static JsonConverter createConverter(TypeDescription schema) {
+    switch (schema.getCategory()) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return new LongColumnConverter();
+      case FLOAT:
+      case DOUBLE:
+        return new DoubleColumnConverter();
+      case CHAR:
+      case VARCHAR:
+      case STRING:
+        return new StringColumnConverter();
+      case DECIMAL:
+        return new DecimalColumnConverter();
+      case TIMESTAMP:
+        return new TimestampColumnConverter();
+      case BINARY:
+        return new BinaryColumnConverter();
+      case BOOLEAN:
+        return new BooleanColumnConverter();
+      case STRUCT:
+        return new StructColumnConverter(schema);
+      case LIST:
+        return new ListColumnConverter(schema);
+      default:
+        throw new IllegalArgumentException("Unhandled type " + schema);
+    }
+  }
+
+  public JsonReader(Path path,
+                    Configuration conf,
+                    TypeDescription schema) throws IOException {
+    this.schema = schema;
+    FileSystem fs = path.getFileSystem(conf);
+    FSDataInputStream raw = fs.open(path);
+    String name = path.getName();
+    int lastDot = name.lastIndexOf(".");
+    InputStream input = raw;
+    if (lastDot >= 0) {
+      if (".gz".equals(name.substring(lastDot))) {
+        input = new GZIPInputStream(raw);
+      }
+    }
+    parser = new JsonStreamParser(new InputStreamReader(input));
+    if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+      throw new IllegalArgumentException("Root must be struct - " + schema);
+    }
+    List<TypeDescription> fieldTypes = schema.getChildren();
+    converters = new JsonConverter[fieldTypes.size()];
+    for(int c = 0; c < converters.length; ++c) {
+      converters[c] = createConverter(fieldTypes.get(c));
+    }
+  }
+
+  public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+    batch.reset();
+    int maxSize = batch.getMaxSize();
+    List<String> fieldNames = schema.getFieldNames();
+    while (parser.hasNext() && batch.size < maxSize) {
+      JsonObject elem = parser.next().getAsJsonObject();
+      for(int c=0; c < converters.length; ++c) {
+        // look up each field to see if it is in the input, otherwise
+        // set it to null.
+        JsonElement field = elem.get(fieldNames.get(c));
+        if (field == null) {
+          batch.cols[c].noNulls = false;
+          batch.cols[c].isNull[batch.size] = true;
+        } else {
+          converters[c].convert(field, batch.cols[c], batch.size);
+        }
+      }
+      batch.size++;
+    }
+    return batch.size != 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
new file mode 100644
index 0000000..4c64ac1
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.json;
+
+import com.google.gson.JsonStreamParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+public class JsonScan {
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+    long rowCount = 0;
+    for(String filename: args) {
+      Path path = new Path(filename);
+      FileSystem fs = path.getFileSystem(conf);
+      FSDataInputStream raw = fs.open(path);
+      int lastDot = filename.lastIndexOf(".");
+      InputStream input = raw;
+      if (lastDot >= 0) {
+        if (".gz".equals(filename.substring(lastDot))) {
+          input = new GZIPInputStream(raw);
+        }
+      }
+      JsonStreamParser parser =
+          new JsonStreamParser(new InputStreamReader(input));
+      while (parser.hasNext()) {
+        parser.next();
+        rowCount += 1;
+      }
+    }
+    System.out.println("Rows read: " + rowCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
new file mode 100644
index 0000000..8ff2af1
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+public class OrcScan {
+  public static void main(String[] args) throws Exception {
+    Configuration conf = new Configuration();
+    OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+    long rowCount = 0;
+    for(String filename: args) {
+      Reader reader = OrcFile.createReader(new Path(filename), options);
+      TypeDescription schema = reader.getSchema();
+      RecordReader rows = reader.rows();
+      VectorizedRowBatch batch = schema.createRowBatch();
+      while (rows.nextBatch(batch)) {
+        rowCount += batch.size;
+      }
+      rows.close();
+    }
+    System.out.println("Rows read: " + rowCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
new file mode 100644
index 0000000..29ae438
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+
+public class ParquetScan {
+  public static void main(String[] args) throws Exception {
+    JobConf conf = new JobConf();
+    long rowCount = 0;
+    ParquetInputFormat<ArrayWritable> inputFormat =
+        new ParquetInputFormat<>(DataWritableReadSupport.class);
+
+    NullWritable nada = NullWritable.get();
+    for(String filename: args) {
+      FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE,
+          new String[]{});
+      RecordReader<NullWritable,ArrayWritable> recordReader =
+          new ParquetRecordReaderWrapper(inputFormat, split, conf,
+              Reporter.NULL);
+      ArrayWritable value = recordReader.createValue();
+      while (recordReader.next(nada, value)) {
+        rowCount += 1;
+      }
+      recordReader.close();
+    }
+    System.out.println("Rows read: " + rowCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 9b77760..c34a4f4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -257,11 +257,6 @@
         <version>0.3</version>
       </dependency>
       <dependency>
-        <groupId>org.apache.commons</groupId>
-        <artifactId>commons-csv</artifactId>
-        <version>1.4</version>
-      </dependency>
-      <dependency>
         <groupId>org.apache.avro</groupId>
         <artifactId>avro</artifactId>
         <version>1.8.1</version>
@@ -273,6 +268,11 @@
         <version>1.8.1</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-csv</artifactId>
+        <version>1.4</version>
+      </dependency>
+      <dependency>
         <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-common</artifactId>
         <version>${hadoop.version}</version>
@@ -371,6 +371,11 @@
         <version>${storage-api.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.parquet</groupId>
+        <artifactId>parquet-hadoop</artifactId>
+        <version>1.8.1</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.jettison</groupId>
         <artifactId>jettison</artifactId>
         <version>1.1</version>