You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/04 03:38:07 UTC
tajo git commit: TAJO-2010: Parquet can not read null value.
Repository: tajo
Updated Branches:
refs/heads/master 0ec2a890c -> 80218d03d
TAJO-2010: Parquet can not read null value.
Closes #903
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/80218d03
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/80218d03
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/80218d03
Branch: refs/heads/master
Commit: 80218d03dffbb16eaaf8eb8b82010c3e98e617d5
Parents: 0ec2a89
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Dec 4 11:37:24 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Dec 4 11:37:24 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../storage/parquet/TajoRecordConverter.java | 13 +--
.../storage/parquet/TajoRecordMaterializer.java | 21 +---
.../org/apache/tajo/storage/TestStorages.java | 116 ++++++++++++++++++-
4 files changed, 124 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 66c99a5..1796ff6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -55,6 +55,8 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2010: Parquet can not read null value. (jinho)
+
TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho)
TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7f236b6..7d73021 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -35,6 +35,7 @@ import parquet.schema.GroupType;
import parquet.schema.Type;
import java.nio.ByteBuffer;
+import java.util.Arrays;
/**
* Converter to convert a Parquet record into a Tajo Tuple.
@@ -146,7 +147,9 @@ public class TajoRecordConverter extends GroupConverter {
*/
@Override
public void start() {
- currentTuple = new VTuple(projectionMap.length);
+ Datum[] datums = new Datum[projectionMap.length];
+ Arrays.fill(datums, NullDatum.get());
+ currentTuple = new VTuple(datums);
}
/**
@@ -154,14 +157,6 @@ public class TajoRecordConverter extends GroupConverter {
*/
@Override
public void end() {
- for (int i = 0; i < projectionMap.length; ++i) {
- final int projectionIndex = projectionMap[i];
- Column column = tajoReadSchema.getColumn(projectionIndex);
- if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
- || currentTuple.isBlankOrNull(i)) {
- set(projectionIndex, NullDatum.get());
- }
- }
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
index 436159c..25610fc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -18,8 +18,8 @@
package org.apache.tajo.storage.parquet;
-import org.apache.tajo.catalog.Column;
import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.Tuple;
import parquet.io.api.GroupConverter;
import parquet.io.api.RecordMaterializer;
@@ -35,24 +35,13 @@ class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
* Creates a new TajoRecordMaterializer.
*
* @param parquetSchema The Parquet schema of the projection.
- * @param tajoSchema The Tajo schema of the projection.
+ * @param tajoRequestSchema The Tajo schema of the projection.
* @param tajoReadSchema The Tajo schema of the table.
*/
- public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
+ public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoRequestSchema,
Schema tajoReadSchema) {
- int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
- this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
- projectionMap);
- }
-
- private int[] getProjectionMap(Schema schema, Schema projection) {
- Column[] targets = projection.toArray();
- int[] projectionMap = new int[targets.length];
- for (int i = 0; i < targets.length; ++i) {
- int tid = schema.getColumnId(targets[i].getQualifiedName());
- projectionMap[i] = tid;
- }
- return projectionMap;
+ int[] projectionMap = PlannerUtil.getTargetIds(tajoReadSchema, tajoRequestSchema.toArray());
+ this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, projectionMap);
}
/**
http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index c70e07c..c4514b9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -40,6 +40,7 @@ import org.apache.tajo.datum.DatumFactory;
import org.apache.tajo.datum.NullDatum;
import org.apache.tajo.datum.ProtobufDatumFactory;
import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
+import org.apache.tajo.plan.util.PlannerUtil;
import org.apache.tajo.storage.fragment.FileFragment;
import org.apache.tajo.storage.fragment.Fragment;
import org.apache.tajo.storage.rcfile.RCFile;
@@ -55,10 +56,9 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Random;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
@RunWith(Parameterized.class)
public class TestStorages {
@@ -541,6 +541,116 @@ public class TestStorages {
}
@Test
+ public void testNullHandlingTypesWithProjection() throws IOException {
+ if (internalType) return;
+
+ boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);
+
+ Schema schema = new Schema();
+ schema.addColumn("col1", Type.BOOLEAN);
+ schema.addColumn("col2", Type.CHAR, 7);
+ schema.addColumn("col3", Type.INT2);
+ schema.addColumn("col4", Type.INT4);
+ schema.addColumn("col5", Type.INT8);
+ schema.addColumn("col6", Type.FLOAT4);
+ schema.addColumn("col7", Type.FLOAT8);
+ schema.addColumn("col8", Type.TEXT);
+ schema.addColumn("col9", Type.BLOB);
+ schema.addColumn("col10", Type.INET4);
+ schema.addColumn("col11", Type.NULL_TYPE);
+
+ if (handleProtobuf) {
+ schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+ }
+
+ KeyValueSet options = new KeyValueSet();
+ TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
+ meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat));
+ meta.putProperty(StorageConstants.TEXT_NULL, "\\\\N");
+ meta.putProperty(StorageConstants.RCFILE_NULL, "\\\\N");
+ meta.putProperty(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+ meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, "\\");
+ if (dataFormat.equalsIgnoreCase("AVRO")) {
+ meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+ }
+
+ Path tablePath = new Path(testDir, "testProjectedNullHandlingTypes.data");
+ FileTablespace sm = TablespaceManager.getLocalFs();
+ Appender appender = sm.getAppender(meta, schema, tablePath);
+ appender.init();
+
+ QueryId queryid = new QueryId("12345", 5);
+ ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+ int columnNum = 11 + (handleProtobuf ? 1 : 0);
+ VTuple seedTuple = new VTuple(columnNum);
+ seedTuple.put(new Datum[]{
+ DatumFactory.createBool(true), // 0
+ DatumFactory.createChar("hyunsik"), // 2
+ DatumFactory.createInt2((short) 17), // 3
+ DatumFactory.createInt4(59), // 4
+ DatumFactory.createInt8(23l), // 5
+ DatumFactory.createFloat4(77.9f), // 6
+ DatumFactory.createFloat8(271.9f), // 7
+ DatumFactory.createText("hyunsik"), // 8
+ DatumFactory.createBlob("hyunsik".getBytes()),// 9
+ DatumFactory.createInet4("192.168.0.1"), // 10
+ NullDatum.get(), // 11
+ });
+
+ if (handleProtobuf) {
+ seedTuple.put(11, factory.createDatum(queryid.getProto())); // 12
+ }
+
+ // Making tuples with different null column positions
+ Tuple tuple;
+ for (int i = 0; i < columnNum; i++) {
+ tuple = new VTuple(columnNum);
+ for (int j = 0; j < columnNum; j++) {
+ if (i == j) { // i'th column will have NULL value
+ tuple.put(j, NullDatum.get());
+ } else {
+ tuple.put(j, seedTuple.get(j));
+ }
+ }
+ appender.addTuple(tuple);
+ }
+ appender.flush();
+ appender.close();
+
+
+ // Making projection schema with different column positions
+ Schema target = new Schema();
+ Random random = new Random();
+ for (int i = 1; i < schema.size(); i++) {
+ int num = random.nextInt(schema.size() - 1) + 1;
+ if (i % num == 0) {
+ target.addColumn(schema.getColumn(i));
+ }
+ }
+
+ FileStatus status = fs.getFileStatus(tablePath);
+ FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+ Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target);
+ scanner.init();
+
+ Tuple retrieved;
+ int[] targetIds = PlannerUtil.getTargetIds(schema, target.toArray());
+ int i = 0;
+ while ((retrieved = scanner.next()) != null) {
+ assertEquals(target.size(), retrieved.size());
+ for (int j = 0; j < targetIds.length; j++) {
+ if (i == targetIds[j]) {
+ assertEquals(NullDatum.get(), retrieved.asDatum(j));
+ } else {
+ assertEquals(seedTuple.get(targetIds[j]), retrieved.asDatum(j));
+ }
+ }
+ i++;
+ }
+ scanner.close();
+ }
+
+ @Test
public void testRCFileTextSerializeDeserialize() throws IOException {
if(!dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) return;