You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2015/05/07 18:36:02 UTC

[3/6] tajo git commit: TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)

TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)

Closes #522


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/b6b9d463
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/b6b9d463
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/b6b9d463

Branch: refs/heads/index_support
Commit: b6b9d46318d0aaa0346578b5ecf11b9d9ba74b0c
Parents: 4755410
Author: Jinho Kim <jh...@apache.org>
Authored: Wed May 6 12:11:37 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Wed May 6 12:11:37 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/storage/rcfile/RCFile.java  | 14 ++---
 .../tajo/storage/text/CSVLineDeserializer.java  | 18 +++++-
 .../org/apache/tajo/storage/TestStorages.java   | 59 ++++++++++++++++++++
 4 files changed, 83 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 952f852..a790655 100644
--- a/CHANGES
+++ b/CHANGES
@@ -108,6 +108,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1534: DelimitedTextFile return null instead of a NullDatum. (jinho)
+
     TAJO-1574: Fix NPE on natural join.
     (Contributed by Dongjoon Hyun, Committed by jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
index 44aabd4..62e5ed9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rcfile/RCFile.java
@@ -1255,17 +1255,18 @@ public class RCFile {
 
       for (int i = 0; i < targetColumnIndexes.length; i++) {
         int tid = targetColumnIndexes[i];
+        SelectedColumn col = new SelectedColumn();
+        col.colIndex = tid;
         if (tid < columnNumber) {
           skippedColIDs[tid] = false;
-
-          SelectedColumn col = new SelectedColumn();
-          col.colIndex = tid;
           col.runLength = 0;
           col.prvLength = -1;
           col.rowReadIndex = 0;
-          selectedColumns[i] = col;
           colValLenBufferReadIn[i] = new NonSyncDataInputBuffer();
+        }  else {
+          col.isNulled = true;
         }
+        selectedColumns[i] = col;
       }
 
       currentKey = createKeyBuffer();
@@ -1583,10 +1584,7 @@ public class RCFile {
 
       for (int selIx = 0; selIx < selectedColumns.length; selIx++) {
         SelectedColumn col = selectedColumns[selIx];
-        if (col == null) {
-          col = new SelectedColumn();
-          col.isNulled = true;
-          selectedColumns[selIx] = col;
+        if (col.isNulled) {
           continue;
         }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
index 6a8c7a9..03a0a26 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/CSVLineDeserializer.java
@@ -23,6 +23,7 @@ import io.netty.buffer.ByteBufProcessor;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.TableMeta;
 import org.apache.tajo.datum.Datum;
+import org.apache.tajo.datum.NullDatum;
 import org.apache.tajo.storage.FieldSerializerDeserializer;
 import org.apache.tajo.storage.Tuple;
 
@@ -80,8 +81,14 @@ public class CSVLineDeserializer extends TextLineDeserializer {
 
       if (projection.length > currentTarget && currentIndex == projection[currentTarget]) {
         lineBuf.setIndex(start, start + fieldLength);
-        Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
-        output.put(currentIndex, datum);
+
+        try {
+          Datum datum = fieldSerDer.deserialize(lineBuf, schema.getColumn(currentIndex), currentIndex, nullChars);
+          output.put(currentIndex, datum);
+        } catch (Exception e) {
+          output.put(currentIndex, NullDatum.get());
+        }
+
         currentTarget++;
       }
 
@@ -92,6 +99,13 @@ public class CSVLineDeserializer extends TextLineDeserializer {
       start = end + 1;
       currentIndex++;
     }
+
+    /* If a text row is less than table schema size, tuple should set to NullDatum */
+    if (projection.length > currentTarget) {
+      for (; currentTarget < projection.length; currentTarget++) {
+        output.put(projection[currentTarget], NullDatum.get());
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/tajo/blob/b6b9d463/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index 790ac4a..456ea00 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -952,4 +952,63 @@ public class TestStorages {
       StorageManager.clearCache();
     }
   }
+
+  @Test
+  public void testLessThanSchemaSize() throws IOException {
+    /* RAW is internal storage. It must be same with schema size */
+    if (storeType == StoreType.RAW || storeType == StoreType.AVRO){
+      return;
+    }
+
+    Schema dataSchema = new Schema();
+    dataSchema.addColumn("col1", Type.FLOAT4);
+    dataSchema.addColumn("col2", Type.FLOAT8);
+    dataSchema.addColumn("col3", Type.INT2);
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(storeType, options);
+    meta.setOptions(CatalogUtil.newPhysicalProperties(storeType));
+
+    Path tablePath = new Path(testDir, "testLessThanSchemaSize.data");
+    FileStorageManager sm = (FileStorageManager) StorageManager.getFileStorageManager(conf);
+    Appender appender = sm.getAppender(meta, dataSchema, tablePath);
+    appender.init();
+
+
+    Tuple expect = new VTuple(dataSchema.size());
+    expect.put(new Datum[]{
+        DatumFactory.createFloat4(Float.MAX_VALUE),
+        DatumFactory.createFloat8(Double.MAX_VALUE),
+        DatumFactory.createInt2(Short.MAX_VALUE)
+    });
+
+    appender.addTuple(expect);
+    appender.flush();
+    appender.close();
+
+    assertTrue(fs.exists(tablePath));
+    FileStatus status = fs.getFileStatus(tablePath);
+    Schema inSchema = new Schema();
+    inSchema.addColumn("col1", Type.FLOAT4);
+    inSchema.addColumn("col2", Type.FLOAT8);
+    inSchema.addColumn("col3", Type.INT2);
+    inSchema.addColumn("col4", Type.INT4);
+    inSchema.addColumn("col5", Type.INT8);
+
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner = StorageManager.getFileStorageManager(conf).getScanner(meta, inSchema, fragment);
+
+    Schema target = new Schema();
+
+    target.addColumn("col2", Type.FLOAT8);
+    target.addColumn("col5", Type.INT8);
+    scanner.setTarget(target.toArray());
+    scanner.init();
+
+    Tuple tuple = scanner.next();
+    scanner.close();
+
+    assertEquals(expect.get(1), tuple.get(1));
+    assertEquals(NullDatum.get(), tuple.get(4));
+  }
 }