You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/04 03:38:07 UTC

tajo git commit: TAJO-2010: Parquet can not read null value.

Repository: tajo
Updated Branches:
  refs/heads/master 0ec2a890c -> 80218d03d


TAJO-2010: Parquet can not read null value.

Closes #903


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/80218d03
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/80218d03
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/80218d03

Branch: refs/heads/master
Commit: 80218d03dffbb16eaaf8eb8b82010c3e98e617d5
Parents: 0ec2a89
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Dec 4 11:37:24 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Dec 4 11:37:24 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../storage/parquet/TajoRecordConverter.java    |  13 +--
 .../storage/parquet/TajoRecordMaterializer.java |  21 +---
 .../org/apache/tajo/storage/TestStorages.java   | 116 ++++++++++++++++++-
 4 files changed, 124 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 66c99a5..1796ff6 100644
--- a/CHANGES
+++ b/CHANGES
@@ -55,6 +55,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2010: Parquet can not read null value. (jinho)
+
     TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho)
 
     TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho)

http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
index 7f236b6..7d73021 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordConverter.java
@@ -35,6 +35,7 @@ import parquet.schema.GroupType;
 import parquet.schema.Type;
 
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 /**
  * Converter to convert a Parquet record into a Tajo Tuple.
@@ -146,7 +147,9 @@ public class TajoRecordConverter extends GroupConverter {
    */
   @Override
   public void start() {
-    currentTuple = new VTuple(projectionMap.length);
+    Datum[] datums = new Datum[projectionMap.length];
+    Arrays.fill(datums, NullDatum.get());
+    currentTuple = new VTuple(datums);
   }
 
   /**
@@ -154,14 +157,6 @@ public class TajoRecordConverter extends GroupConverter {
    */
   @Override
   public void end() {
-    for (int i = 0; i < projectionMap.length; ++i) {
-      final int projectionIndex = projectionMap[i];
-      Column column = tajoReadSchema.getColumn(projectionIndex);
-      if (column.getDataType().getType() == TajoDataTypes.Type.NULL_TYPE
-          || currentTuple.isBlankOrNull(i)) {
-        set(projectionIndex, NullDatum.get());
-      }
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
index 436159c..25610fc 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/TajoRecordMaterializer.java
@@ -18,8 +18,8 @@
 
 package org.apache.tajo.storage.parquet;
 
-import org.apache.tajo.catalog.Column;
 import org.apache.tajo.catalog.Schema;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.Tuple;
 import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordMaterializer;
@@ -35,24 +35,13 @@ class TajoRecordMaterializer extends RecordMaterializer<Tuple> {
    * Creates a new TajoRecordMaterializer.
    *
    * @param parquetSchema The Parquet schema of the projection.
-   * @param tajoSchema The Tajo schema of the projection.
+   * @param tajoRequestSchema The Tajo schema of the projection.
    * @param tajoReadSchema The Tajo schema of the table.
    */
-  public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoSchema,
+  public TajoRecordMaterializer(MessageType parquetSchema, Schema tajoRequestSchema,
                                 Schema tajoReadSchema) {
-    int[] projectionMap = getProjectionMap(tajoReadSchema, tajoSchema);
-    this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema,
-                                        projectionMap);
-  }
-
-  private int[] getProjectionMap(Schema schema, Schema projection) {
-    Column[] targets = projection.toArray();
-    int[] projectionMap = new int[targets.length];
-    for (int i = 0; i < targets.length; ++i) {
-      int tid = schema.getColumnId(targets[i].getQualifiedName());
-      projectionMap[i] = tid;
-    }
-    return projectionMap;
+    int[] projectionMap = PlannerUtil.getTargetIds(tajoReadSchema, tajoRequestSchema.toArray());
+    this.root = new TajoRecordConverter(parquetSchema, tajoReadSchema, projectionMap);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tajo/blob/80218d03/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index c70e07c..c4514b9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -40,6 +40,7 @@ import org.apache.tajo.datum.DatumFactory;
 import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.datum.ProtobufDatumFactory;
 import org.apache.tajo.exception.ValueTooLongForTypeCharactersException;
+import org.apache.tajo.plan.util.PlannerUtil;
 import org.apache.tajo.storage.fragment.FileFragment;
 import org.apache.tajo.storage.fragment.Fragment;
 import org.apache.tajo.storage.rcfile.RCFile;
@@ -55,10 +56,9 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.Random;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 @RunWith(Parameterized.class)
 public class TestStorages {
@@ -541,6 +541,116 @@ public class TestStorages {
   }
 
   @Test
+  public void testNullHandlingTypesWithProjection() throws IOException {
+    if (internalType) return;
+
+    boolean handleProtobuf = !dataFormat.equalsIgnoreCase(BuiltinStorages.JSON);
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.BOOLEAN);
+    schema.addColumn("col2", Type.CHAR, 7);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+    schema.addColumn("col6", Type.FLOAT4);
+    schema.addColumn("col7", Type.FLOAT8);
+    schema.addColumn("col8", Type.TEXT);
+    schema.addColumn("col9", Type.BLOB);
+    schema.addColumn("col10", Type.INET4);
+    schema.addColumn("col11", Type.NULL_TYPE);
+
+    if (handleProtobuf) {
+      schema.addColumn("col12", CatalogUtil.newDataType(Type.PROTOBUF, TajoIdProtos.QueryIdProto.class.getName()));
+    }
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
+    meta.setPropertySet(CatalogUtil.newDefaultProperty(dataFormat));
+    meta.putProperty(StorageConstants.TEXT_NULL, "\\\\N");
+    meta.putProperty(StorageConstants.RCFILE_NULL, "\\\\N");
+    meta.putProperty(StorageConstants.RCFILE_SERDE, TextSerializerDeserializer.class.getName());
+    meta.putProperty(StorageConstants.SEQUENCEFILE_NULL, "\\");
+    if (dataFormat.equalsIgnoreCase("AVRO")) {
+      meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_NULL_HANDLING_TYPES_AVRO_SCHEMA);
+    }
+
+    Path tablePath = new Path(testDir, "testProjectedNullHandlingTypes.data");
+    FileTablespace sm = TablespaceManager.getLocalFs();
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+
+    QueryId queryid = new QueryId("12345", 5);
+    ProtobufDatumFactory factory = ProtobufDatumFactory.get(TajoIdProtos.QueryIdProto.class.getName());
+    int columnNum = 11 + (handleProtobuf ? 1 : 0);
+    VTuple seedTuple = new VTuple(columnNum);
+    seedTuple.put(new Datum[]{
+        DatumFactory.createBool(true),                // 0
+        DatumFactory.createChar("hyunsik"),           // 2
+        DatumFactory.createInt2((short) 17),          // 3
+        DatumFactory.createInt4(59),                  // 4
+        DatumFactory.createInt8(23l),                 // 5
+        DatumFactory.createFloat4(77.9f),             // 6
+        DatumFactory.createFloat8(271.9f),            // 7
+        DatumFactory.createText("hyunsik"),           // 8
+        DatumFactory.createBlob("hyunsik".getBytes()),// 9
+        DatumFactory.createInet4("192.168.0.1"),      // 10
+        NullDatum.get(),                              // 11
+    });
+
+    if (handleProtobuf) {
+      seedTuple.put(11, factory.createDatum(queryid.getProto()));       // 12
+    }
+
+    // Making tuples with different null column positions
+    Tuple tuple;
+    for (int i = 0; i < columnNum; i++) {
+      tuple = new VTuple(columnNum);
+      for (int j = 0; j < columnNum; j++) {
+        if (i == j) { // i'th column will have NULL value
+          tuple.put(j, NullDatum.get());
+        } else {
+          tuple.put(j, seedTuple.get(j));
+        }
+      }
+      appender.addTuple(tuple);
+    }
+    appender.flush();
+    appender.close();
+
+
+    // Making projection schema with different column positions
+    Schema target = new Schema();
+    Random random = new Random();
+    for (int i = 1; i < schema.size(); i++) {
+      int num = random.nextInt(schema.size() - 1) + 1;
+      if (i % num == 0) {
+        target.addColumn(schema.getColumn(i));
+      }
+    }
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = TablespaceManager.getLocalFs().getScanner(meta, schema, fragment, target);
+    scanner.init();
+
+    Tuple retrieved;
+    int[] targetIds = PlannerUtil.getTargetIds(schema, target.toArray());
+    int i = 0;
+    while ((retrieved = scanner.next()) != null) {
+      assertEquals(target.size(), retrieved.size());
+      for (int j = 0; j < targetIds.length; j++) {
+        if (i == targetIds[j]) {
+          assertEquals(NullDatum.get(), retrieved.asDatum(j));
+        } else {
+          assertEquals(seedTuple.get(targetIds[j]), retrieved.asDatum(j));
+        }
+      }
+      i++;
+    }
+    scanner.close();
+  }
+
+  @Test
   public void testRCFileTextSerializeDeserialize() throws IOException {
     if(!dataFormat.equalsIgnoreCase(BuiltinStorages.RCFILE)) return;