You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/10/01 10:20:40 UTC
git commit: TAJO-1044: Implement nextFetch(RowBlock) of Parquer
scanner.
Repository: tajo
Updated Branches:
refs/heads/block_iteration 95af3cce9 -> fade0a87b
TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner.
Closes #172
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fade0a87
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fade0a87
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fade0a87
Branch: refs/heads/block_iteration
Commit: fade0a87be6beba1cd24838d61bf000948454e96
Parents: 95af3cc
Author: Hyunsik Choi <hy...@apache.org>
Authored: Wed Oct 1 01:18:47 2014 -0700
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Wed Oct 1 01:18:47 2014 -0700
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/catalog/CatalogUtil.java | 20 +-
.../src/main/proto/CatalogProtos.proto | 5 +-
.../TestCTASQuery/CtasWithManagedTable.sql | 2 +-
.../org/apache/tajo/storage/FileScanner.java | 2 +-
.../org/apache/tajo/storage/MergeScanner.java | 2 +-
.../org/apache/tajo/storage/NullScanner.java | 9 +
.../storage/parquet/TajoRecordConverter.java | 57 ++-
.../storage/parquet/TajoSchemaConverter.java | 6 +-
.../tajo/storage/parquet/TajoWriteSupport.java | 5 +-
.../tuple/offheap/HeapTupleBytesComparator.java | 1 +
.../hadoop/ParquetRowBlockConverter.java | 353 +++++++++++++++++++
.../hadoop/ParquetRowBlockParquetReader.java | 234 ++++++++++++
.../parquet/hadoop/ParquetRowBlockScanner.java | 126 +++++++
.../parquet/hadoop/ParquetRowDirectReader.java | 185 ++++++++++
.../parquet/hadoop/ParquetRowMaterializer.java | 74 ++++
.../parquet/hadoop/ParquetRowReadSupport.java | 107 ++++++
.../src/main/resources/storage-default.xml | 18 +-
.../apache/tajo/storage/TestNextFetches.java | 299 ++++------------
.../storage/parquet/TestSchemaConverter.java | 2 +-
.../src/test/resources/storage-default.xml | 18 +-
21 files changed, 1269 insertions(+), 258 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 40b62de..cab6966 100644
--- a/CHANGES
+++ b/CHANGES
@@ -2,6 +2,8 @@ Tajo Change Log
Block Iteration - branch
+ TAJO-1044: Implement nextFetch(RowBlock) of Parquer scanner. (hyunsik)
+
TAJO-1084: Generated classes should access directly UnSafeTuple and
RowWriter. (hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
index a2d9796..96dac62 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
+++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java
@@ -277,25 +277,7 @@ public class CatalogUtil {
}
public static StoreType getStoreType(final String typeStr) {
- if (typeStr.equalsIgnoreCase(StoreType.CSV.name())) {
- return StoreType.CSV;
- } else if (typeStr.equalsIgnoreCase(StoreType.RAW.name())) {
- return StoreType.RAW;
- } else if (typeStr.equalsIgnoreCase(StoreType.ROWFILE.name())) {
- return StoreType.ROWFILE;
- } else if (typeStr.equalsIgnoreCase(StoreType.RCFILE.name())) {
- return StoreType.RCFILE;
- } else if (typeStr.equalsIgnoreCase(StoreType.TREVNI.name())) {
- return StoreType.TREVNI;
- } else if (typeStr.equalsIgnoreCase(StoreType.PARQUET.name())) {
- return StoreType.PARQUET;
- } else if (typeStr.equalsIgnoreCase(StoreType.SEQUENCEFILE.name())) {
- return StoreType.SEQUENCEFILE;
- } else if (typeStr.equalsIgnoreCase(StoreType.AVRO.name())) {
- return StoreType.AVRO;
- } else {
- return null;
- }
+ return StoreType.valueOf(typeStr.toUpperCase());
}
public static TableMeta newTableMeta(StoreType type) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
----------------------------------------------------------------------
diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
index c45e1ef..2cfc1a8 100644
--- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
+++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto
@@ -35,8 +35,9 @@ enum StoreType {
HCFILE = 6;
TREVNI = 7;
PARQUET = 8;
- SEQUENCEFILE = 9;
- AVRO = 10;
+ BLOCK_PARQUET = 9;
+ SEQUENCEFILE = 10;
+ AVRO = 11;
}
enum OrderType {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
index 1dd5e90..ce6ebdb 100644
--- a/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
+++ b/tajo-core/src/test/resources/queries/TestCTASQuery/CtasWithManagedTable.sql
@@ -1,4 +1,4 @@
-create table "MANAGED_TABLE1" (col1 float, col2 float) using rcfile as
+create table "MANAGED_TABLE1" (col1 float, col2 float) using parquet as
select
sum(l_orderkey) as total1,
avg(l_partkey) as total2
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
index d4357e3..0c6bd48 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/FileScanner.java
@@ -84,7 +84,7 @@ public abstract class FileScanner implements Scanner {
@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
- throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
+ throw new UnimplementedException(getClass().getSimpleName() + "::nextFetch(OffHeapRowBlock) is not implemented");
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
index 890455a..9fdc483 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/MergeScanner.java
@@ -117,7 +117,7 @@ public class MergeScanner implements Scanner {
@Override
public boolean nextFetch(OffHeapRowBlock rowBlock) {
- throw new UnimplementedException("nextFetch(OffHeapRowBlock) is not implemented");
+ throw new UnimplementedException("MergeScanner::nextFetch(OffHeapRowBlock) is not implemented.");
}
@Override
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
index 4cec67d..56908ed 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/NullScanner.java
@@ -19,7 +19,9 @@ package org.apache.tajo.storage; /**
import org.apache.hadoop.conf.Configuration;
import org.apache.tajo.catalog.Schema;
import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.UnimplementedException;
import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
import java.io.IOException;
@@ -36,6 +38,13 @@ public class NullScanner extends FileScanner {
}
@Override
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+ progress = 1.0f;
+
+ return false;
+ }
+
+ @Override
public void reset() throws IOException {
progress = 0.0f;
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7c3d79d..4036ddd 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -117,12 +117,18 @@ public class TajoRecordConverter extends GroupConverter {
return new FieldFloat4Converter(parent);
case FLOAT8:
return new FieldFloat8Converter(parent);
+ case TIMESTAMP:
+ return new FieldTimestampConverter(parent);
+ case TIME:
+ return new FieldTimeConverter(parent);
+ case DATE:
+ return new FieldDateConverter(parent);
+ case TEXT:
+ return new FieldTextConverter(parent);
case INET4:
return new FieldInet4Converter(parent);
case INET6:
throw new RuntimeException("No converter for INET6");
- case TEXT:
- return new FieldTextConverter(parent);
case PROTOBUF:
return new FieldProtobufConverter(parent, dataType);
case BLOB:
@@ -209,7 +215,7 @@ public class TajoRecordConverter extends GroupConverter {
@Override
final public void addInt(int value) {
- parent.add(DatumFactory.createBit((byte)(value & 0xff)));
+ parent.add(DatumFactory.createBit((byte) (value & 0xff)));
}
}
@@ -235,7 +241,7 @@ public class TajoRecordConverter extends GroupConverter {
@Override
final public void addInt(int value) {
- parent.add(DatumFactory.createInt2((short)value));
+ parent.add(DatumFactory.createInt2((short) value));
}
}
@@ -329,8 +335,8 @@ public class TajoRecordConverter extends GroupConverter {
}
@Override
- final public void addBinary(Binary value) {
- parent.add(DatumFactory.createInet4(value.getBytes()));
+ public void addInt(int value) {
+ parent.add(DatumFactory.createInet4(value));
}
}
@@ -360,6 +366,45 @@ public class TajoRecordConverter extends GroupConverter {
}
}
+ static final class FieldTimestampConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldTimestampConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(long value) {
+ parent.add(DatumFactory.createTimestamp(value));
+ }
+ }
+
+ static final class FieldTimeConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldTimeConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(long value) {
+ parent.add(DatumFactory.createTime(value));
+ }
+ }
+
+ static final class FieldDateConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldDateConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addInt(int value) {
+ parent.add(DatumFactory.createDate(value));
+ }
+ }
+
static final class FieldProtobufConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
private final DataType dataType;
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
index 2592231..cfca618 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoSchemaConverter.java
@@ -161,9 +161,14 @@ public class TajoSchemaConverter {
case BIT:
case INT2:
case INT4:
+ case DATE:
+ case INET4:
return primitive(column.getSimpleName(),
PrimitiveType.PrimitiveTypeName.INT32);
case INT8:
+ case TIMESTAMP:
+ case TIME:
+
return primitive(column.getSimpleName(),
PrimitiveType.PrimitiveTypeName.INT64);
case FLOAT4:
@@ -183,7 +188,6 @@ public class TajoSchemaConverter {
case BLOB:
return primitive(column.getSimpleName(),
PrimitiveType.PrimitiveTypeName.BINARY);
- case INET4:
case INET6:
return primitive(column.getSimpleName(),
PrimitiveType.PrimitiveTypeName.BINARY);
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
index 35165de..c0164d1 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/storage/parquet/TajoWriteSupport.java
@@ -121,9 +121,13 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
case BIT:
case INT2:
case INT4:
+ case INET4:
+ case DATE:
recordConsumer.addInteger(datum.asInt4());
break;
case INT8:
+ case TIMESTAMP:
+ case TIME:
recordConsumer.addLong(datum.asInt8());
break;
case FLOAT4:
@@ -138,7 +142,6 @@ public class TajoWriteSupport extends WriteSupport<Tuple> {
break;
case PROTOBUF:
case BLOB:
- case INET4:
case INET6:
recordConsumer.addBinary(Binary.fromByteArray(datum.asByteArray()));
break;
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
index 5298286..39ce930 100644
--- a/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
+++ b/tajo-storage/src/main/java/org/apache/tajo/tuple/offheap/HeapTupleBytesComparator.java
@@ -56,6 +56,7 @@ public class HeapTupleBytesComparator {
for (int i = 0; i < minWords * Longs.BYTES; i += Longs.BYTES) {
long lw = UNSAFE.getLong(t1.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset1);
long rw = UNSAFE.getLong(t2.data, UnsafeUtil.ARRAY_BYTE_BASE_OFFSET + offset2);
+
long diff = lw ^ rw;
if (diff != 0) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
new file mode 100644
index 0000000..5129ea3
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockConverter.java
@@ -0,0 +1,353 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.common.TajoDataTypes.DataType;
+import org.apache.tajo.datum.ProtobufDatumFactory;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+/**
+ * Converter to convert a Parquet record into a Tajo Tuple.
+ */
+public class ParquetRowBlockConverter extends GroupConverter {
+ private final GroupType parquetSchema;
+ private final Schema tajoReadSchema;
+ private final int[] projectionMap;
+ private final int tupleSize;
+
+ private final Converter[] converters;
+
+ private Object [] currentTuple;
+
+ /**
+ * Creates a new TajoRecordConverter.
+ *
+ * @param parquetSchema The Parquet schema of the projection.
+ * @param tajoReadSchema The Tajo schema of the table.
+ * @param projectionMap An array mapping the projection column to the column
+ * index in the table.
+ */
+ public ParquetRowBlockConverter(GroupType parquetSchema, Schema tajoReadSchema, int[] projectionMap) {
+ this.parquetSchema = parquetSchema;
+ this.tajoReadSchema = tajoReadSchema;
+ this.projectionMap = projectionMap;
+ this.tupleSize = tajoReadSchema.size();
+
+ // The projectionMap.length does not match parquetSchema.getFieldCount()
+ // when the projection contains NULL_TYPE columns. We will skip over the
+ // NULL_TYPE columns when we construct the converters and populate the
+ // NULL_TYPE columns with NullDatums in start().
+ int index = 0;
+ this.converters = new Converter[parquetSchema.getFieldCount()];
+ for (int i = 0; i < projectionMap.length; ++i) {
+ final int projectionIndex = projectionMap[i];
+ Column column = tajoReadSchema.getColumn(projectionIndex);
+ if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE) {
+ continue;
+ }
+ Type type = parquetSchema.getType(index);
+ converters[index] = newConverter(column, type, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ ParquetRowBlockConverter.this.set(projectionIndex, value);
+ }
+ });
+ ++index;
+ }
+ }
+
+ private void set(int index, Object value) {
+ currentTuple[index] = value;
+ }
+
+ private Converter newConverter(Column column, Type type,
+ ParentValueContainer parent) {
+ DataType dataType = column.getDataType();
+ switch (dataType.getType()) {
+ case BOOLEAN:
+ return new FieldBooleanConverter(parent);
+ case CHAR:
+ return new FieldCharConverter(parent);
+ case INT2:
+ return new FieldInt2Converter(parent);
+ case INT4:
+ case INET4:
+ case DATE:
+ return new FieldInt4Converter(parent);
+ case INT8:
+ case TIMESTAMP:
+ case TIME:
+ return new FieldInt8Converter(parent);
+ case FLOAT4:
+ return new FieldFloat4Converter(parent);
+ case FLOAT8:
+ return new FieldFloat8Converter(parent);
+ case INET6:
+ throw new RuntimeException("No converter for INET6");
+ case TEXT:
+ return new FieldTextConverter(parent);
+ case PROTOBUF:
+ return new FieldProtobufConverter(parent, dataType);
+ case BLOB:
+ return new FieldBlobConverter(parent);
+ case NULL_TYPE:
+ throw new RuntimeException("No converter for NULL_TYPE.");
+ default:
+ throw new RuntimeException("Unsupported data type");
+ }
+ }
+
+ /**
+ * Gets the converter for a specific field.
+ *
+ * @param fieldIndex Index of the field in the projection.
+ * @return The converter for the field.
+ */
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return converters[fieldIndex];
+ }
+
+ /**
+ * Called before processing fields. This method fills any fields that have
+ * NULL values or have type NULL_TYPE with a NullDatum.
+ */
+ @Override
+ public void start() {
+ currentTuple = new Object[tupleSize];
+ }
+
+ /**
+ * Called after all fields have been processed.
+ */
+ @Override
+ public void end() {
+ }
+
+ /**
+ * Returns the current record converted by this converter.
+ *
+ * @return The current record.
+ */
+ public Object [] getCurrentRecord() {
+ return currentTuple;
+ }
+
+ static abstract class ParentValueContainer {
+ /**
+ * Adds the value to the parent.
+ *
+ * @param value The value to add.
+ */
+ abstract void add(Object value);
+ }
+
+ static final class FieldBooleanConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldBooleanConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBoolean(boolean value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldCharConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldCharConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ public void addBinary(parquet.io.api.Binary value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldInt2Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt2Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add((short)value);
+ }
+ }
+
+ static final class FieldInt4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldInt8Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInt8Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldFloat4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldFloat4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldFloat8Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldFloat8Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addInt(int value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addLong(long value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addFloat(float value) {
+ parent.add(value);
+ }
+
+ @Override
+ final public void addDouble(double value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldInet4Converter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldInet4Converter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldTextConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldTextConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldBlobConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+
+ public FieldBlobConverter(ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ parent.add(value);
+ }
+ }
+
+ static final class FieldProtobufConverter extends PrimitiveConverter {
+ private final ParentValueContainer parent;
+ private final DataType dataType;
+
+ public FieldProtobufConverter(ParentValueContainer parent,
+ DataType dataType) {
+ this.parent = parent;
+ this.dataType = dataType;
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ try {
+ ProtobufDatumFactory factory =
+ ProtobufDatumFactory.get(dataType.getCode());
+ Message.Builder builder = factory.newBuilder();
+ builder.mergeFrom(value.getBytes());
+ parent.add(factory.createDatum(builder));
+ } catch (InvalidProtocolBufferException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
new file mode 100644
index 0000000..24920a6
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockParquetReader.java
@@ -0,0 +1,234 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.common.TajoDataTypes;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+import org.apache.tajo.tuple.offheap.RowWriter;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.GlobalMetaData;
+import parquet.io.api.Binary;
+import parquet.schema.MessageType;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.*;
+
+public class ParquetRowBlockParquetReader implements Closeable {
+
+ private ReadSupport<Object []> readSupport;
+ private UnboundRecordFilter filter;
+ private Configuration conf;
+ private ReadSupport.ReadContext readContext;
+ private Iterator<Footer> footersIterator;
+ private ParquetRowDirectReader reader;
+ private GlobalMetaData globalMetaData;
+ private final int [] projectedMap;
+
+ public ParquetRowBlockParquetReader(Path file, Schema schema, Schema target) throws IOException {
+ this(file, new ParquetRowReadSupport(schema, target));
+ }
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetRowBlockParquetReader(Path file, ParquetRowReadSupport readSupport) throws IOException {
+ this(file, readSupport, null);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @throws java.io.IOException
+ */
+ public ParquetRowBlockParquetReader(Configuration conf, Path file, ParquetRowReadSupport readSupport) throws IOException {
+ this(conf, file, readSupport, null);
+ }
+
+ /**
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ private ParquetRowBlockParquetReader(Path file, ParquetRowReadSupport readSupport, UnboundRecordFilter filter)
+ throws IOException {
+ this(new Configuration(), file, readSupport, filter);
+ }
+
+ /**
+ * @param conf the configuration
+ * @param file the file to read
+ * @param readSupport to materialize records
+ * @param filter the filter to use to filter records
+ * @throws java.io.IOException
+ */
+ public ParquetRowBlockParquetReader(Configuration conf, Path file, ParquetRowReadSupport readSupport,
+ UnboundRecordFilter filter) throws IOException {
+
+ this.readSupport = readSupport;
+ this.filter = filter;
+ this.conf = conf;
+
+ FileSystem fs = file.getFileSystem(conf);
+ List<FileStatus> statuses = Arrays.asList(fs.listStatus(file));
+ List<Footer> footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(conf, statuses);
+ this.footersIterator = footers.iterator();
+ globalMetaData = ParquetFileWriter.getGlobalMetaData(footers);
+
+ List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
+ for (Footer footer : footers) {
+ blocks.addAll(footer.getParquetMetadata().getBlocks());
+ }
+
+ MessageType schema = globalMetaData.getSchema();
+ Map<String, Set<String>> extraMetadata = globalMetaData.getKeyValueMetaData();
+ readContext = readSupport.init(new InitContext(conf, extraMetadata, schema));
+
+ this.tajoSchema = readSupport.getSchema();
+ this.columnNum = tajoSchema.size();
+ tajoTypes = new TajoDataTypes.Type[columnNum];
+ for (int i = 0; i < columnNum; i++) {
+ tajoTypes[i] = tajoSchema.getColumn(i).getDataType().getType();
+ }
+
+ projectedMap = new int[readSupport.getTargetSchema().size()];
+ for (int i = 0; i < readSupport.getTargetSchema().size(); i++) {
+ projectedMap[i] = tajoSchema.getColumnId(readSupport.getTargetSchema().getColumn(i).getQualifiedName());
+ }
+ }
+
+ Schema tajoSchema;
+ int columnNum;
+ TajoDataTypes.Type [] tajoTypes;
+
+ /**
+ * @return the next record or null if finished
+ * @throws java.io.IOException
+ */
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+ rowBlock.clear();
+
+ try {
+ if (reader != null) {
+ RowWriter writer = rowBlock.getWriter();
+
+ while(rowBlock.rows() < rowBlock.maxRowNum() && reader.nextKeyValue()) {
+
+ writer.startRow();
+ int prevId = -1;
+ Object [] values = reader.getCurrentValue();
+ for (int columnIdx = 0; columnIdx < projectedMap.length; columnIdx++) {
+ int actualId = projectedMap[columnIdx];
+
+ if (actualId - prevId > 1) {
+ writer.skipField((actualId - prevId) - 1);
+ }
+
+ if (values[actualId] != null) {
+ switch (tajoTypes[actualId]) {
+ case BOOLEAN:
+ writer.putBool((Boolean) values[actualId]);
+ break;
+ case CHAR:
+ writer.putText(((Binary) values[actualId]).getBytes());
+ break;
+ case INT1:
+ case INT2:
+ writer.putInt2((Short) values[actualId]);
+ break;
+ case INT4:
+ case INET4:
+ case DATE:
+ writer.putInt4((Integer) values[actualId]);
+ break;
+ case INT8:
+ case TIMESTAMP:
+ case TIME:
+ writer.putInt8((Long) values[actualId]);
+ break;
+ case FLOAT4:
+ writer.putFloat4((Float) values[actualId]);
+ break;
+ case FLOAT8:
+ writer.putFloat8((Double) values[actualId]);
+ break;
+ case TEXT:
+ writer.putText(((Binary) values[actualId]).getBytes());
+ break;
+ case BLOB:
+ writer.putBlob(((Binary) values[actualId]).getBytes());
+ break;
+
+ default:
+ throw new IOException("Not supported type: " + tajoTypes[actualId].name());
+ }
+ } else {
+ writer.skipField();
+ }
+
+ prevId = actualId;
+ }
+
+ writer.endRow();
+ }
+
+ return rowBlock.rows() > 0;
+ } else {
+ initReader();
+ return reader == null ? null : nextFetch(rowBlock);
+ }
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private void initReader() throws IOException {
+ if (reader != null) {
+ reader.close();
+ reader = null;
+ }
+ if (footersIterator.hasNext()) {
+ Footer footer = footersIterator.next();
+ reader = new ParquetRowDirectReader(readSupport, filter);
+ reader.initialize(
+ readContext.getRequestedSchema(), globalMetaData.getSchema(), footer.getParquetMetadata().getFileMetaData()
+ .getKeyValueMetaData(),
+ readContext.getReadSupportMetadata(), footer.getFile(), footer.getParquetMetadata().getBlocks(), conf);
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
new file mode 100644
index 0000000..b771cef
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowBlockScanner.java
@@ -0,0 +1,126 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.exception.UnimplementedException;
+import org.apache.tajo.storage.FileScanner;
+import org.apache.tajo.storage.Tuple;
+import org.apache.tajo.storage.fragment.FileFragment;
+import org.apache.tajo.tuple.offheap.OffHeapRowBlock;
+
+import java.io.IOException;
+
+/**
+ * FileScanner for reading Parquet files
+ */
+public class ParquetRowBlockScanner extends FileScanner {
+ private ParquetRowBlockParquetReader reader;
+
+ /**
+ * Creates a new ParquetScanner.
+ *
+ * @param conf
+ * @param schema
+ * @param meta
+ * @param fragment
+ */
+ public ParquetRowBlockScanner(Configuration conf, final Schema schema,
+ final TableMeta meta, final FileFragment fragment) {
+ super(conf, schema, meta, fragment);
+ }
+
+ /**
+ * Initializes the ParquetScanner. This method initializes the
+ * TajoParquetReader.
+ */
+ @Override
+ public void init() throws IOException {
+ if (targets == null) {
+ targets = schema.toArray();
+ }
+ reader = new ParquetRowBlockParquetReader(fragment.getPath(), schema, new Schema(targets));
+ super.init();
+ }
+
+ /**
+ * Reads the next Tuple from the Parquet file.
+ *
+ * @return The next Tuple from the Parquet file or null if end of file is
+ * reached.
+ */
+ @Override
+ public Tuple next() throws IOException {
+ throw new UnimplementedException("next() is not implemented.");
+ }
+
+ @Override
+ public boolean nextFetch(OffHeapRowBlock rowBlock) throws IOException {
+ return reader.nextFetch(rowBlock);
+ }
+
+ /**
+ * Resets the scanner
+ */
+ @Override
+ public void reset() throws IOException {
+ }
+
+ /**
+ * Closes the scanner.
+ */
+ @Override
+ public void close() throws IOException {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+
+ /**
+ * Returns whether this scanner is projectable.
+ *
+ * @return true
+ */
+ @Override
+ public boolean isProjectable() {
+ return true;
+ }
+
+ /**
+ * Returns whether this scanner is selectable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSelectable() {
+ return false;
+ }
+
+ /**
+ * Returns whether this scanner is splittable.
+ *
+ * @return false
+ */
+ @Override
+ public boolean isSplittable() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
new file mode 100644
index 0000000..0fda072
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowDirectReader.java
@@ -0,0 +1,185 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import parquet.Log;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.PageReadStore;
+import parquet.filter.UnboundRecordFilter;
+import parquet.hadoop.api.ReadSupport;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.util.counters.BenchmarkCounter;
+import parquet.io.ColumnIOFactory;
+import parquet.io.MessageColumnIO;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import static java.lang.String.format;
+import static parquet.Log.DEBUG;
+
+public class ParquetRowDirectReader {
+ private static final Log LOG = Log.getLog(InternalParquetRecordReader.class);
+
+ private final ColumnIOFactory columnIOFactory = new ColumnIOFactory();
+
+ private MessageType requestedSchema;
+ private MessageType fileSchema;
+ private int columnCount;
+ private final ReadSupport<Object []> readSupport;
+
+ private RecordMaterializer<Object []> recordConverter;
+
+ private Object [] currentValue;
+ private long total;
+ private int current = 0;
+ private int currentBlock = -1;
+ private ParquetFileReader reader;
+ private parquet.io.RecordReader<Object []> recordReader;
+ private UnboundRecordFilter recordFilter;
+
+ private long totalTimeSpentReadingBytes;
+ private long totalTimeSpentProcessingRecords;
+ private long startedAssemblingCurrentBlockAt;
+
+ private long totalCountLoadedSoFar = 0;
+
+ private Path file;
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ */
+ public ParquetRowDirectReader(ReadSupport<Object[]> readSupport) {
+ this(readSupport, null);
+ }
+
+ /**
+ * @param readSupport Object which helps reads files of the given type, e.g. Thrift, Avro.
+ * @param filter Optional filter for only returning matching records.
+ */
+ public ParquetRowDirectReader(ReadSupport<Object[]> readSupport, UnboundRecordFilter filter) {
+ this.readSupport = readSupport;
+ this.recordFilter = filter;
+ }
+
+ private void checkRead() throws IOException {
+ if (current == totalCountLoadedSoFar) {
+ if (current != 0) {
+ long timeAssembling = System.currentTimeMillis() - startedAssemblingCurrentBlockAt;
+ totalTimeSpentProcessingRecords += timeAssembling;
+ LOG.info("Assembled and processed " + totalCountLoadedSoFar + " records from " + columnCount + " columns in "
+ + totalTimeSpentProcessingRecords + " ms: "+((float)totalCountLoadedSoFar / totalTimeSpentProcessingRecords)
+ + " rec/ms, " + ((float)totalCountLoadedSoFar * columnCount / totalTimeSpentProcessingRecords) + " cell/ms");
+ long totalTime = totalTimeSpentProcessingRecords + totalTimeSpentReadingBytes;
+ long percentReading = 100 * totalTimeSpentReadingBytes / totalTime;
+ long percentProcessing = 100 * totalTimeSpentProcessingRecords / totalTime;
+ LOG.info("time spent so far " + percentReading + "% reading ("+totalTimeSpentReadingBytes+" ms) and " +
+ percentProcessing + "% processing ("+totalTimeSpentProcessingRecords+" ms)");
+ }
+
+ LOG.info("at row " + current + ". reading next block");
+ long t0 = System.currentTimeMillis();
+ PageReadStore pages = reader.readNextRowGroup();
+ if (pages == null) {
+ throw new IOException("expecting more rows but reached last block. Read " + current + " out of " + total);
+ }
+ long timeSpentReading = System.currentTimeMillis() - t0;
+ totalTimeSpentReadingBytes += timeSpentReading;
+ BenchmarkCounter.incrementTime(timeSpentReading);
+ LOG.info("block read in memory in " + timeSpentReading + " ms. row count = " + pages.getRowCount());
+ if (Log.DEBUG) LOG.debug("initializing Record assembly with requested schema " + requestedSchema);
+ MessageColumnIO columnIO = columnIOFactory.getColumnIO(requestedSchema, fileSchema);
+ recordReader = columnIO.getRecordReader(pages, recordConverter, recordFilter);
+ startedAssemblingCurrentBlockAt = System.currentTimeMillis();
+ totalCountLoadedSoFar += pages.getRowCount();
+ ++ currentBlock;
+ }
+ }
+
+ public void close() throws IOException {
+ reader.close();
+ }
+
+ public Object [] getCurrentValue() throws IOException, InterruptedException {
+ return currentValue;
+ }
+
+ public float getProgress() throws IOException, InterruptedException {
+ return (float) current / total;
+ }
+
+ public void initialize(MessageType requestedSchema, MessageType fileSchema,
+ Map<String, String> extraMetadata, Map<String, String> readSupportMetadata,
+ Path file, List<BlockMetaData> blocks, Configuration configuration)
+ throws IOException {
+ this.requestedSchema = requestedSchema;
+ this.fileSchema = fileSchema;
+ this.file = file;
+ this.columnCount = this.requestedSchema.getPaths().size();
+ this.recordConverter = readSupport.prepareForRead(
+ configuration, extraMetadata, fileSchema,
+ new ReadSupport.ReadContext(requestedSchema, readSupportMetadata));
+
+ List<ColumnDescriptor> columns = requestedSchema.getColumns();
+ reader = new ParquetFileReader(configuration, file, blocks, columns);
+ for (BlockMetaData block : blocks) {
+ total += block.getRowCount();
+ }
+ LOG.info("RecordReader initialized will read a total of " + total + " records.");
+ }
+
+ private boolean contains(GroupType group, String[] path, int index) {
+ if (index == path.length) {
+ return false;
+ }
+ if (group.containsField(path[index])) {
+ Type type = group.getType(path[index]);
+ if (type.isPrimitive()) {
+ return index + 1 == path.length;
+ } else {
+ return contains(type.asGroupType(), path, index + 1);
+ }
+ }
+ return false;
+ }
+
+ public boolean nextKeyValue() throws IOException, InterruptedException {
+ if (current < total) {
+ try {
+ checkRead();
+ currentValue = recordReader.read();
+ if (DEBUG) LOG.debug("read value: " + currentValue);
+ current ++;
+ } catch (RuntimeException e) {
+ throw new ParquetDecodingException(format("Can not read value at %d in block %d in file %s", current,
+ currentBlock, file), e);
+ }
+ return true;
+ }
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
new file mode 100644
index 0000000..212e805
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowMaterializer.java
@@ -0,0 +1,74 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.tajo.catalog.Column;
+import org.apache.tajo.catalog.Schema;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+/**
+ * Materializes a Tajo Tuple from a stream of Parquet data.
+ */
+class ParquetRowMaterializer extends RecordMaterializer<Object []> {
+ private final ParquetRowBlockConverter root;
+
+ /**
+ * Creates a new TajoRecordMaterializer.
+ *
+ * @param parquetSchema The Parquet schema of the projection.
+ * @param tajoSchema The Tajo schema of the projection.
+ * @param tajoReadSchema The Tajo schema of the table.
+ */
+ public ParquetRowMaterializer(MessageType parquetSchema, Schema tajoSchema, Schema tajoReadSchema) {
+ int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
+ this.root = new ParquetRowBlockConverter(parquetSchema, tajoReadSchema, projectionMap);
+ }
+
+ private int[] getProjectionMap(Schema schema, Schema projection) {
+ Column[] targets = projection.toArray();
+ int[] projectionMap = new int[targets.length];
+ for (int i = 0; i < targets.length; ++i) {
+ int tid = schema.getColumnId(targets[i].getQualifiedName());
+ projectionMap[i] = tid;
+ }
+ return projectionMap;
+ }
+
+ /**
+ * Returns the current record being materialized.
+ *
+ * @return The record being materialized.
+ */
+ @Override
+ public Object [] getCurrentRecord() {
+ return root.getCurrentRecord();
+ }
+
+ /**
+ * Returns the root converter.
+ *
+ * @return The root converter
+ */
+ @Override
+ public GroupConverter getRootConverter() {
+ return root;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
new file mode 100644
index 0000000..0e09d3f
--- /dev/null
+++ b/tajo-storage/src/main/java/parquet/hadoop/ParquetRowReadSupport.java
@@ -0,0 +1,107 @@
+/*
+ * Lisensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.storage.parquet.TajoSchemaConverter;
+import parquet.Log;
+import parquet.hadoop.api.InitContext;
+import parquet.hadoop.api.ReadSupport;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.Map;
+
+/**
+ * Tajo implementation of {@link parquet.hadoop.api.ReadSupport} for {@link org.apache.tajo.storage.Tuple}s.
+ * Users should use {@link org.apache.tajo.storage.parquet.ParquetScanner} and not this class directly.
+ */
+public class ParquetRowReadSupport extends ReadSupport<Object []> {
+ private static final Log LOG = Log.getLog(ParquetRowReadSupport.class);
+
+ private Schema readSchema;
+ private Schema requestedSchema;
+
+ /**
+ * Creates a new TajoReadSupport.
+ *
+ * @param requestedSchema The Tajo schema of the requested projection passed
+ * down by ParquetScanner.
+ */
+ public ParquetRowReadSupport(Schema readSchema, Schema requestedSchema) {
+ super();
+ this.readSchema = readSchema;
+ this.requestedSchema = requestedSchema;
+ }
+
+ /**
+ * Creates a new TajoReadSupport.
+ *
+ * @param readSchema The schema of the table.
+ */
+ public ParquetRowReadSupport(Schema readSchema) {
+ super();
+ this.readSchema = readSchema;
+ this.requestedSchema = readSchema;
+ }
+
+ /**
+ * Initializes the ReadSupport.
+ *
+ * @param context The InitContext.
+ * @return A ReadContext that defines how to read the file.
+ */
+ @Override
+ public ReadContext init(InitContext context) {
+ if (requestedSchema == null) {
+ throw new RuntimeException("requestedSchema is null.");
+ }
+ MessageType requestedParquetSchema =
+ new TajoSchemaConverter().convert(requestedSchema);
+ LOG.debug("Reading data with projection:\n" + requestedParquetSchema);
+ return new ReadContext(requestedParquetSchema);
+ }
+
+ /**
+ * Prepares for read.
+ *
+ * @param configuration The job configuration.
+ * @param keyValueMetaData App-specific metadata from the file.
+ * @param fileSchema The schema of the Parquet file.
+ * @param readContext Returned by the init method.
+ */
+ @Override
+ public RecordMaterializer<Object []> prepareForRead(
+ Configuration configuration,
+ Map<String, String> keyValueMetaData,
+ MessageType fileSchema,
+ ReadContext readContext) {
+ MessageType parquetRequestedSchema = readContext.getRequestedSchema();
+ return new ParquetRowMaterializer(parquetRequestedSchema, requestedSchema, readSchema);
+ }
+
+ public Schema getSchema() {
+ return readSchema;
+ }
+
+ public Schema getTargetSchema() {
+ return requestedSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/main/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/main/resources/storage-default.xml b/tajo-storage/src/main/resources/storage-default.xml
index b227e9d..b0e952e 100644
--- a/tajo-storage/src/main/resources/storage-default.xml
+++ b/tajo-storage/src/main/resources/storage-default.xml
@@ -40,7 +40,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw,block_parquet</value>
</property>
<!--- Fragment Class Configurations -->
@@ -73,6 +73,10 @@
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
+ <name>tajo.storage.fragment.block_parquet.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
<name>tajo.storage.fragment.sequencefile.class</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
@@ -143,6 +147,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.block_parquet.class</name>
+ <value>parquet.hadoop.ParquetRowBlockScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.v2.parquet.class</name>
<value>org.apache.tajo.storage.parquet.ParquetScanner</value>
</property>
@@ -170,7 +179,7 @@
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
- <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,block_parquet</value>
</property>
<property>
@@ -209,6 +218,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.block_parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.sequencefile.class</name>
<value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
</property>
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
index e81964b..11d0eb4 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestNextFetches.java
@@ -97,11 +97,14 @@ public class TestNextFetches {
" ]\n" +
"}\n";
+ private Schema schema;
+
private StoreType storeType;
private boolean splitable;
private boolean statsable;
private Path testDir;
private FileSystem fs;
+ private Tuple allTypedTuple;
public TestNextFetches(StoreType type, boolean splitable, boolean statsable) throws IOException {
this.storeType = type;
@@ -116,6 +119,43 @@ public class TestNextFetches {
testDir = CommonTestingUtil.getTestDir(TEST_PATH);
fs = testDir.getFileSystem(conf);
+
+ schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+ if (storeType == StoreType.RAW) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+ int columnNum = 11 + (storeType == StoreType.RAW ? 1 : 0);
+ allTypedTuple = new VTuple(columnNum);
+ allTypedTuple.put(new Datum[]{
+ DatumFactory.createBool(true),
+ DatumFactory.createChar("jinho"),
+ DatumFactory.createInt2((short) 17),
+ DatumFactory.createInt4(59),
+ DatumFactory.createInt8(23l),
+ DatumFactory.createFloat4(77.9f),
+ DatumFactory.createFloat8(271.9f),
+ DatumFactory.createText("jinho"),
+ DatumFactory.createBlob("jinho babo".getBytes()),
+ DatumFactory.createInet4("192.168.0.1"),
+ NullDatum.get(),
+ });
+ if (storeType == StoreType.RAW) {
+ allTypedTuple.put(11, factory.createDatum(queryid.getProto()));
+ }
}
@Parameterized.Parameters
@@ -125,7 +165,7 @@ public class TestNextFetches {
// TODO - to be implemented
// {StoreType.RAW, false, false},
// {StoreType.RCFILE, true, true},
-// {StoreType.PARQUET, false, false},
+ {StoreType.BLOCK_PARQUET, false, false},
// {StoreType.SEQUENCEFILE, true, true},
// {StoreType.AVRO, false, false},
});
@@ -301,12 +341,13 @@ public class TestNextFetches {
|| storeType == StoreType.TREVNI
|| storeType == StoreType.CSV
|| storeType == StoreType.PARQUET
+ || storeType == StoreType.BLOCK_PARQUET
|| storeType == StoreType.SEQUENCEFILE
|| storeType == StoreType.AVRO) {
assertTrue(tuple.isNull(0));
}
- assertTrue(tupleCnt + 2 == tuple.getInt8(1));
- assertTrue(tupleCnt + 3 == tuple.getFloat4(2));
+ assertTrue(tuple.toString(), tupleCnt + 2 == tuple.getInt8(1));
+ assertTrue(tuple.toString(), tupleCnt + 3 == tuple.getFloat4(2));
tupleCnt++;
}
}
@@ -319,20 +360,6 @@ public class TestNextFetches {
@Test
public void testVariousTypes() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
@@ -344,26 +371,7 @@ public class TestNextFetches {
Path tablePath = new Path(testDir, "testVariousTypes.data");
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(12);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createChar("hyunsik"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("hyunsik"),
- DatumFactory.createBlob("hyunsik".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
+ appender.addTuple(allTypedTuple);
appender.flush();
appender.close();
@@ -379,8 +387,14 @@ public class TestNextFetches {
while (scanner.nextFetch(rowBlock)) {
RowBlockReader reader = rowBlock.getReader();
while (reader.next(zcTuple)) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), zcTuple.get(i));
+ for (int i = 0; i < allTypedTuple.size(); i++) {
+ if (schema.getColumn(i).getDataType().getType() == Type.CHAR) {
+ assertEquals(i + "th column is different.",
+ allTypedTuple.get(i).asChars().trim(), zcTuple.get(i).asChars().trim());
+ } else {
+ assertEquals(i + "th column is different.", allTypedTuple.get(i), zcTuple.get(i));
+ }
+
}
}
}
@@ -391,20 +405,6 @@ public class TestNextFetches {
@Test
public void testNullHandlingTypes() throws IOException {
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.setOptions(StorageUtil.newPhysicalProperties(storeType));
@@ -421,34 +421,16 @@ public class TestNextFetches {
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.init();
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple seedTuple = new VTuple(12);
- seedTuple.put(new Datum[]{
- DatumFactory.createBool(true), // 0
- DatumFactory.createChar("hyunsik"), // 1
- DatumFactory.createInt2((short) 17), // 2
- DatumFactory.createInt4(59), // 3
- DatumFactory.createInt8(23l), // 4
- DatumFactory.createFloat4(77.9f), // 5
- DatumFactory.createFloat8(271.9f), // 6
- DatumFactory.createText("hyunsik"), // 7
- DatumFactory.createBlob("hyunsik".getBytes()),// 8
- DatumFactory.createInet4("192.168.0.1"), // 9
- NullDatum.get(), // 10
- factory.createDatum(queryid.getProto()) // 11
- });
-
+ int columnNum = allTypedTuple.size();
// Making tuples with different null column positions
Tuple tuple;
- for (int i = 0; i < 12; i++) {
- tuple = new VTuple(12);
- for (int j = 0; j < 12; j++) {
+ for (int i = 0; i < columnNum; i++) {
+ tuple = new VTuple(columnNum);
+ for (int j = 0; j < columnNum; j++) {
if (i == j) { // i'th column will have NULL value
tuple.put(j, NullDatum.get());
} else {
- tuple.put(j, seedTuple.get(j));
+ tuple.put(j, allTypedTuple.get(j));
}
}
appender.addTuple(tuple);
@@ -471,12 +453,16 @@ public class TestNextFetches {
RowBlockReader reader = rowBlock.getReader();
while(reader.next(retrieved)) {
- assertEquals(12, retrieved.size());
- for (int j = 0; j < 12; j++) {
+ assertEquals(columnNum, retrieved.size());
+ for (int j = 0; j < columnNum; j++) {
if (i == j) {
assertEquals(NullDatum.get(), retrieved.get(j));
} else {
- assertEquals(seedTuple.get(j), retrieved.get(j));
+ if (schema.getColumn(j).getDataType().getType() == Type.CHAR) {
+ assertEquals(allTypedTuple.get(j).asChars().trim(), retrieved.get(j).asChars().trim());
+ } else {
+ assertEquals(allTypedTuple.get(j), retrieved.get(j));
+ }
}
}
@@ -492,20 +478,6 @@ public class TestNextFetches {
public void testRCFileTextSerializeDeserialize() throws IOException {
if(storeType != StoreType.RCFILE) return;
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.putOption(StorageConstants.CSVFILE_SERDE, TextSerializerDeserializer.class.getName());
@@ -518,22 +490,8 @@ public class TestNextFetches {
QueryId queryid = new QueryId("12345", 5);
ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
- Tuple tuple = new VTuple(12);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
+ int columnNum = allTypedTuple.size();
+ appender.addTuple(allTypedTuple);
appender.flush();
appender.close();
@@ -551,8 +509,8 @@ public class TestNextFetches {
while (scanner.nextFetch(rowBlock)) {
RowBlockReader reader = rowBlock.getReader();
while (reader.next(retrieved)) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
+ for (int i = 0; i < allTypedTuple.size(); i++) {
+ assertEquals(allTypedTuple.get(i), retrieved.get(i));
}
}
}
@@ -567,20 +525,6 @@ public class TestNextFetches {
public void testRCFileBinarySerializeDeserialize() throws IOException {
if(storeType != StoreType.RCFILE) return;
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.putOption(StorageConstants.RCFILE_SERDE, BinarySerializerDeserializer.class.getName());
@@ -589,27 +533,7 @@ public class TestNextFetches {
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(12);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
+ appender.addTuple(allTypedTuple);
appender.flush();
appender.close();
@@ -627,8 +551,8 @@ public class TestNextFetches {
while (scanner.nextFetch(rowBlock)) {
RowBlockReader reader = rowBlock.getReader();
while (reader.next(retrieved)) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
+ for (int i = 0; i < allTypedTuple.size(); i++) {
+ assertEquals(allTypedTuple.get(i), retrieved.get(i));
}
}
}
@@ -643,20 +567,6 @@ public class TestNextFetches {
public void testSequenceFileTextSerializeDeserialize() throws IOException {
if(storeType != StoreType.SEQUENCEFILE) return;
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.CHAR, 7);
- schema.addColumn("col3", Type.INT2);
- schema.addColumn("col4", Type.INT4);
- schema.addColumn("col5", Type.INT8);
- schema.addColumn("col6", Type.FLOAT4);
- schema.addColumn("col7", Type.FLOAT8);
- schema.addColumn("col8", Type.TEXT);
- schema.addColumn("col9", Type.BLOB);
- schema.addColumn("col10", Type.INET4);
- schema.addColumn("col11", Type.NULL_TYPE);
- schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, TextSerializerDeserializer.class.getName());
@@ -665,26 +575,7 @@ public class TestNextFetches {
Appender appender = StorageManagerFactory.getStorageManager(conf).getAppender(meta, schema, tablePath);
appender.enableStats();
appender.init();
-
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(12);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
+ appender.addTuple(allTypedTuple);
appender.flush();
appender.close();
@@ -707,8 +598,8 @@ public class TestNextFetches {
while (scanner.nextFetch(rowBlock)) {
RowBlockReader reader = rowBlock.getReader();
while (reader.next(retrieved)) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
+ for (int i = 0; i < allTypedTuple.size(); i++) {
+ assertEquals(allTypedTuple.get(i), retrieved.get(i));
}
}
}
@@ -723,21 +614,6 @@ public class TestNextFetches {
public void testSequenceFileBinarySerializeDeserialize() throws IOException {
if(storeType != StoreType.SEQUENCEFILE) return;
- Schema schema = new Schema();
- schema.addColumn("col1", Type.BOOLEAN);
- schema.addColumn("col2", Type.BIT);
- schema.addColumn("col3", Type.CHAR, 7);
- schema.addColumn("col4", Type.INT2);
- schema.addColumn("col5", Type.INT4);
- schema.addColumn("col6", Type.INT8);
- schema.addColumn("col7", Type.FLOAT4);
- schema.addColumn("col8", Type.FLOAT8);
- schema.addColumn("col9", Type.TEXT);
- schema.addColumn("col10", Type.BLOB);
- schema.addColumn("col11", Type.INET4);
- schema.addColumn("col12", Type.NULL_TYPE);
- schema.addColumn("col13", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
-
KeyValueSet options = new KeyValueSet();
TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
meta.putOption(StorageConstants.SEQUENCEFILE_SERDE, BinarySerializerDeserializer.class.getName());
@@ -747,26 +623,7 @@ public class TestNextFetches {
appender.enableStats();
appender.init();
- QueryId queryid = new QueryId("12345", 5);
- ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
-
- Tuple tuple = new VTuple(13);
- tuple.put(new Datum[] {
- DatumFactory.createBool(true),
- DatumFactory.createBit((byte) 0x99),
- DatumFactory.createChar("jinho"),
- DatumFactory.createInt2((short) 17),
- DatumFactory.createInt4(59),
- DatumFactory.createInt8(23l),
- DatumFactory.createFloat4(77.9f),
- DatumFactory.createFloat8(271.9f),
- DatumFactory.createText("jinho"),
- DatumFactory.createBlob("hyunsik babo".getBytes()),
- DatumFactory.createInet4("192.168.0.1"),
- NullDatum.get(),
- factory.createDatum(queryid.getProto())
- });
- appender.addTuple(tuple);
+ appender.addTuple(allTypedTuple);
appender.flush();
appender.close();
@@ -789,8 +646,8 @@ public class TestNextFetches {
while (scanner.nextFetch(rowBlock)) {
RowBlockReader reader = rowBlock.getReader();
while (reader.next(retrieved)) {
- for (int i = 0; i < tuple.size(); i++) {
- assertEquals(tuple.get(i), retrieved.get(i));
+ for (int i = 0; i < allTypedTuple.size(); i++) {
+ assertEquals(allTypedTuple.get(i), retrieved.get(i));
}
}
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
index 49a162b..bab4180 100644
--- a/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
+++ b/tajo-storage/src/test/java/org/apache/tajo/storage/parquet/TestSchemaConverter.java
@@ -61,7 +61,7 @@ public class TestSchemaConverter {
" optional binary mytext (UTF8);\n" +
" optional binary myblob;\n" +
// NULL_TYPE fields are not encoded.
- " optional binary myinet4;\n" +
+ " optional int32 myinet4;\n" +
" optional binary myprotobuf;\n" +
"}\n";
http://git-wip-us.apache.org/repos/asf/tajo/blob/fade0a87/tajo-storage/src/test/resources/storage-default.xml
----------------------------------------------------------------------
diff --git a/tajo-storage/src/test/resources/storage-default.xml b/tajo-storage/src/test/resources/storage-default.xml
index 5bdeff0..01674a2 100644
--- a/tajo-storage/src/test/resources/storage-default.xml
+++ b/tajo-storage/src/test/resources/storage-default.xml
@@ -45,7 +45,7 @@
<!--- Registered Scanner Handler -->
<property>
<name>tajo.storage.scanner-handler</name>
- <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw</value>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,directraw,block_parquet</value>
</property>
<!--- Fragment Class Configurations -->
@@ -78,6 +78,10 @@
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
<property>
+ <name>tajo.storage.fragment.block_parquet.class</name>
+ <value>org.apache.tajo.storage.fragment.FileFragment</value>
+ </property>
+ <property>
<name>tajo.storage.fragment.sequencefile.class</name>
<value>org.apache.tajo.storage.fragment.FileFragment</value>
</property>
@@ -148,6 +152,11 @@
</property>
<property>
+ <name>tajo.storage.scanner-handler.block_parquet.class</name>
+ <value>parquet.hadoop.ParquetRowBlockScanner</value>
+ </property>
+
+ <property>
<name>tajo.storage.scanner-handler.v2.parquet.class</name>
<value>org.apache.tajo.storage.parquet.ParquetScanner</value>
</property>
@@ -175,7 +184,7 @@
<!--- Appender Handler -->
<property>
<name>tajo.storage.appender-handler</name>
- <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro</value>
+ <value>csv,raw,rcfile,row,trevni,parquet,sequencefile,avro,block_parquet</value>
</property>
<property>
@@ -214,6 +223,11 @@
</property>
<property>
+ <name>tajo.storage.appender-handler.block_parquet.class</name>
+ <value>org.apache.tajo.storage.parquet.ParquetAppender</value>
+ </property>
+
+ <property>
<name>tajo.storage.appender-handler.sequencefile.class</name>
<value>org.apache.tajo.storage.sequencefile.SequenceFileAppender</value>
</property>