You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@orc.apache.org by om...@apache.org on 2016/10/17 18:25:10 UTC
[08/15] orc git commit: more updates
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
new file mode 100644
index 0000000..ae78cc4
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/csv/CsvScan.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.csv;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+import org.apache.orc.bench.TaxiToOrc;
+
+public class CsvScan {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ long rowCount = 0;
+ TypeDescription schema = TaxiToOrc.loadSchema("nyc-taxi.schema");
+ for(String filename: args) {
+ CsvReader reader = new CsvReader(new Path(filename), conf, schema);
+ VectorizedRowBatch batch = schema.createRowBatch();
+ while (reader.nextBatch(batch)) {
+ rowCount += batch.size;
+ }
+ }
+ System.out.println("Rows read: " + rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
new file mode 100644
index 0000000..a5057e4
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/json/JsonReader.java
@@ -0,0 +1,278 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.orc.bench.json;
+
+import com.google.gson.JsonArray;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonStreamParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.TypeDescription;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Timestamp;
+import java.util.List;
+import java.util.zip.GZIPInputStream;
+
+public class JsonReader {
+ private final TypeDescription schema;
+ private final JsonStreamParser parser;
+ private final JsonConverter[] converters;
+
+ interface JsonConverter {
+ void convert(JsonElement value, ColumnVector vect, int row);
+ }
+
+ static class BooleanColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ LongColumnVector vector = (LongColumnVector) vect;
+ vector.vector[row] = value.getAsBoolean() ? 1 : 0;
+ }
+ }
+ }
+
+ static class LongColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ LongColumnVector vector = (LongColumnVector) vect;
+ vector.vector[row] = value.getAsLong();
+ }
+ }
+ }
+
+ static class DoubleColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ DoubleColumnVector vector = (DoubleColumnVector) vect;
+ vector.vector[row] = value.getAsDouble();
+ }
+ }
+ }
+
+ static class StringColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ BytesColumnVector vector = (BytesColumnVector) vect;
+ byte[] bytes = value.getAsString().getBytes();
+ vector.setRef(row, bytes, 0, bytes.length);
+ }
+ }
+ }
+
+ static class BinaryColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ BytesColumnVector vector = (BytesColumnVector) vect;
+ String binStr = value.getAsString();
+ byte[] bytes = new byte[binStr.length()/2];
+ for(int i=0; i < bytes.length; ++i) {
+ bytes[i] = (byte) Integer.parseInt(binStr.substring(i*2, i*2+2), 16);
+ }
+ vector.setRef(row, bytes, 0, bytes.length);
+ }
+ }
+ }
+
+ static class TimestampColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ TimestampColumnVector vector = (TimestampColumnVector) vect;
+ vector.set(row, Timestamp.valueOf(value.getAsString()
+ .replaceAll("[TZ]", " ")));
+ }
+ }
+ }
+
+ static class DecimalColumnConverter implements JsonConverter {
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ DecimalColumnVector vector = (DecimalColumnVector) vect;
+ vector.vector[row].set(HiveDecimal.create(value.getAsString()));
+ }
+ }
+ }
+
+ static class StructColumnConverter implements JsonConverter {
+ private JsonConverter[] childrenConverters;
+ private List<String> fieldNames;
+
+ public StructColumnConverter(TypeDescription schema) {
+ List<TypeDescription> kids = schema.getChildren();
+ childrenConverters = new JsonConverter[kids.size()];
+ for(int c=0; c < childrenConverters.length; ++c) {
+ childrenConverters[c] = createConverter(kids.get(c));
+ }
+ fieldNames = schema.getFieldNames();
+ }
+
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ StructColumnVector vector = (StructColumnVector) vect;
+ JsonObject obj = value.getAsJsonObject();
+ for(int c=0; c < childrenConverters.length; ++c) {
+ JsonElement elem = obj.get(fieldNames.get(c));
+ childrenConverters[c].convert(elem, vector.fields[c], row);
+ }
+ }
+ }
+ }
+
+ static class ListColumnConverter implements JsonConverter {
+ private JsonConverter childrenConverter;
+
+ public ListColumnConverter(TypeDescription schema) {
+ childrenConverter = createConverter(schema.getChildren().get(0));
+ }
+
+ public void convert(JsonElement value, ColumnVector vect, int row) {
+ if (value == null || value.isJsonNull()) {
+ vect.noNulls = false;
+ vect.isNull[row] = true;
+ } else {
+ ListColumnVector vector = (ListColumnVector) vect;
+ JsonArray obj = value.getAsJsonArray();
+ vector.lengths[row] = obj.size();
+ vector.offsets[row] = vector.childCount;
+ vector.childCount += vector.lengths[row];
+ vector.child.ensureSize(vector.childCount, true);
+ for(int c=0; c < obj.size(); ++c) {
+ childrenConverter.convert(obj.get(c), vector.child,
+ (int) vector.offsets[row] + c);
+ }
+ }
+ }
+ }
+
+ static JsonConverter createConverter(TypeDescription schema) {
+ switch (schema.getCategory()) {
+ case BYTE:
+ case SHORT:
+ case INT:
+ case LONG:
+ return new LongColumnConverter();
+ case FLOAT:
+ case DOUBLE:
+ return new DoubleColumnConverter();
+ case CHAR:
+ case VARCHAR:
+ case STRING:
+ return new StringColumnConverter();
+ case DECIMAL:
+ return new DecimalColumnConverter();
+ case TIMESTAMP:
+ return new TimestampColumnConverter();
+ case BINARY:
+ return new BinaryColumnConverter();
+ case BOOLEAN:
+ return new BooleanColumnConverter();
+ case STRUCT:
+ return new StructColumnConverter(schema);
+ case LIST:
+ return new ListColumnConverter(schema);
+ default:
+ throw new IllegalArgumentException("Unhandled type " + schema);
+ }
+ }
+
+ public JsonReader(Path path,
+ Configuration conf,
+ TypeDescription schema) throws IOException {
+ this.schema = schema;
+ FileSystem fs = path.getFileSystem(conf);
+ FSDataInputStream raw = fs.open(path);
+ String name = path.getName();
+ int lastDot = name.lastIndexOf(".");
+ InputStream input = raw;
+ if (lastDot >= 0) {
+ if (".gz".equals(name.substring(lastDot))) {
+ input = new GZIPInputStream(raw);
+ }
+ }
+ parser = new JsonStreamParser(new InputStreamReader(input));
+ if (schema.getCategory() != TypeDescription.Category.STRUCT) {
+ throw new IllegalArgumentException("Root must be struct - " + schema);
+ }
+ List<TypeDescription> fieldTypes = schema.getChildren();
+ converters = new JsonConverter[fieldTypes.size()];
+ for(int c = 0; c < converters.length; ++c) {
+ converters[c] = createConverter(fieldTypes.get(c));
+ }
+ }
+
+ public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
+ batch.reset();
+ int maxSize = batch.getMaxSize();
+ List<String> fieldNames = schema.getFieldNames();
+ while (parser.hasNext() && batch.size < maxSize) {
+ JsonObject elem = parser.next().getAsJsonObject();
+ for(int c=0; c < converters.length; ++c) {
+ // look up each field to see if it is in the input, otherwise
+ // set it to null.
+ JsonElement field = elem.get(fieldNames.get(c));
+ if (field == null) {
+ batch.cols[c].noNulls = false;
+ batch.cols[c].isNull[batch.size] = true;
+ } else {
+ converters[c].convert(field, batch.cols[c], batch.size);
+ }
+ }
+ batch.size++;
+ }
+ return batch.size != 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
new file mode 100644
index 0000000..4c64ac1
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/json/JsonScan.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.json;
+
+import com.google.gson.JsonStreamParser;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.zip.GZIPInputStream;
+
+public class JsonScan {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+ long rowCount = 0;
+ for(String filename: args) {
+ Path path = new Path(filename);
+ FileSystem fs = path.getFileSystem(conf);
+ FSDataInputStream raw = fs.open(path);
+ int lastDot = filename.lastIndexOf(".");
+ InputStream input = raw;
+ if (lastDot >= 0) {
+ if (".gz".equals(filename.substring(lastDot))) {
+ input = new GZIPInputStream(raw);
+ }
+ }
+ JsonStreamParser parser =
+ new JsonStreamParser(new InputStreamReader(input));
+ while (parser.hasNext()) {
+ parser.next();
+ rowCount += 1;
+ }
+ }
+ System.out.println("Rows read: " + rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
new file mode 100644
index 0000000..8ff2af1
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/orc/OrcScan.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.orc.OrcFile;
+import org.apache.orc.Reader;
+import org.apache.orc.RecordReader;
+import org.apache.orc.TypeDescription;
+
+public class OrcScan {
+ public static void main(String[] args) throws Exception {
+ Configuration conf = new Configuration();
+ OrcFile.ReaderOptions options = OrcFile.readerOptions(conf);
+ long rowCount = 0;
+ for(String filename: args) {
+ Reader reader = OrcFile.createReader(new Path(filename), options);
+ TypeDescription schema = reader.getSchema();
+ RecordReader rows = reader.rows();
+ VectorizedRowBatch batch = schema.createRowBatch();
+ while (rows.nextBatch(batch)) {
+ rowCount += batch.size;
+ }
+ rows.close();
+ }
+ System.out.println("Rows read: " + rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
----------------------------------------------------------------------
diff --git a/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
new file mode 100644
index 0000000..29ae438
--- /dev/null
+++ b/java/bench/src/java/org/apache/orc/bench/parquet/ParquetScan.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.bench.parquet;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport;
+import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.parquet.hadoop.ParquetInputFormat;
+
+public class ParquetScan {
+ public static void main(String[] args) throws Exception {
+ JobConf conf = new JobConf();
+ long rowCount = 0;
+ ParquetInputFormat<ArrayWritable> inputFormat =
+ new ParquetInputFormat<>(DataWritableReadSupport.class);
+
+ NullWritable nada = NullWritable.get();
+ for(String filename: args) {
+ FileSplit split = new FileSplit(new Path(filename), 0, Long.MAX_VALUE,
+ new String[]{});
+ RecordReader<NullWritable,ArrayWritable> recordReader =
+ new ParquetRecordReaderWrapper(inputFormat, split, conf,
+ Reporter.NULL);
+ ArrayWritable value = recordReader.createValue();
+ while (recordReader.next(nada, value)) {
+ rowCount += 1;
+ }
+ recordReader.close();
+ }
+ System.out.println("Rows read: " + rowCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/orc/blob/5b37113b/java/pom.xml
----------------------------------------------------------------------
diff --git a/java/pom.xml b/java/pom.xml
index 9b77760..c34a4f4 100644
--- a/java/pom.xml
+++ b/java/pom.xml
@@ -257,11 +257,6 @@
<version>0.3</version>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-csv</artifactId>
- <version>1.4</version>
- </dependency>
- <dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.8.1</version>
@@ -273,6 +268,11 @@
<version>1.8.1</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-csv</artifactId>
+ <version>1.4</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
@@ -371,6 +371,11 @@
<version>${storage-api.version}</version>
</dependency>
<dependency>
+ <groupId>org.apache.parquet</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>1.8.1</version>
+ </dependency>
+ <dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.1</version>