You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/01/12 03:21:08 UTC

tajo git commit: TAJO-2038: NPE in DelimitedTextFileScanner#getProgress.

Repository: tajo
Updated Branches:
  refs/heads/master 64c31a226 -> f62d34bea


TAJO-2038: NPE in DelimitedTextFileScanner#getProgress.

Closes #930


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f62d34be
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f62d34be
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f62d34be

Branch: refs/heads/master
Commit: f62d34beab7fba2da6a54ad88eb37fed093c37b5
Parents: 64c31a2
Author: Jinho Kim <jh...@apache.org>
Authored: Tue Jan 12 11:20:14 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue Jan 12 11:20:14 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../apache/tajo/storage/avro/AvroScanner.java   | 11 +++
 .../org/apache/tajo/storage/orc/ORCScanner.java |  5 +-
 .../tajo/storage/parquet/ParquetScanner.java    | 10 +++
 .../sequencefile/SequenceFileScanner.java       | 25 +++++++
 .../tajo/storage/text/DelimitedTextFile.java    | 76 +++++++++++---------
 .../parquet/InternalParquetRecordReader.java    |  2 +-
 .../thirdparty/parquet/ParquetReader.java       |  8 +++
 .../org/apache/tajo/storage/TestStorages.java   | 53 ++++++++++++++
 9 files changed, 154 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ffe3c6d..b1a85a8 100644
--- a/CHANGES
+++ b/CHANGES
@@ -78,6 +78,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2038: NPE in FileScanner#getProgress. (jinho)
+
     TAJO-2034: Files required for executing python functions are not copied in 
     testEval(). (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
index afa2701..ad48850 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/avro/AvroScanner.java
@@ -292,4 +292,15 @@ public class AvroScanner extends FileScanner {
   public boolean isSplittable() {
     return false;
   }
+
+  @Override
+  public float getProgress() {
+    if (!inited) return super.getProgress();
+
+    if (!dataFileReader.hasNext()) {
+      return 1.0f;
+    } else {
+      return 0.0f;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
index 32a2aaa..9351c59 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/orc/ORCScanner.java
@@ -113,8 +113,6 @@ public class ORCScanner extends FileScanner {
       targets = schema.toArray();
     }
 
-    super.init();
-
     outTuple = new VTuple(targets.length);
 
     Path path = fragment.getPath();
@@ -163,6 +161,7 @@ public class ORCScanner extends FileScanner {
     recordReader = orcReader.createRecordReader(columnSet, OrcPredicate.TRUE,
         fragment.getStartKey(), fragment.getLength(), DateTimeZone.forTimeZone(timezone));
 
+    super.init();
     LOG.debug("file fragment { path: " + fragment.getPath() +
       ", start offset: " + fragment.getStartKey() +
       ", length: " + fragment.getLength() + "}");
@@ -307,6 +306,8 @@ public class ORCScanner extends FileScanner {
 
   @Override
   public float getProgress() {
+    if(!inited) return super.getProgress();
+
     return recordReader.getProgress();
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
index ef74a90..d7f753c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/parquet/ParquetScanner.java
@@ -124,4 +124,14 @@ public class ParquetScanner extends FileScanner {
   public boolean isSplittable() {
     return false;
   }
+
+  @Override
+  public float getProgress() {
+
+    if (!inited) {
+      return super.getProgress();
+    } else {
+      return reader.getProgress();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
index 37cffdb..9ad5ab3 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/sequencefile/SequenceFileScanner.java
@@ -361,4 +361,29 @@ public class SequenceFileScanner extends FileScanner {
   public boolean isSplittable(){
     return true;
   }
+
+  @Override
+  public float getProgress() {
+    if (!inited) return super.getProgress();
+
+    if (!more) {
+      return 1.0f;
+    } else {
+      long filePos;
+      float progress;
+      try {
+        filePos = reader.getPosition();
+        if (start == filePos) {
+          progress = 0.0f;
+        } else {
+          long readBytes = filePos - start;
+          long remainingBytes = Math.max(end - filePos, 0);
+          progress = Math.min(1.0f, (float) (readBytes) / (float) (readBytes + remainingBytes));
+        }
+      } catch (IOException e) {
+        progress = 0.0f;
+      }
+      return progress;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
index 46d7f6a..12ab738 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/text/DelimitedTextFile.java
@@ -300,50 +300,19 @@ public class DelimitedTextFile {
 
     @Override
     public void init() throws IOException {
-      if (reader != null) {
-        reader.close();
-      }
-
-      if(deserializer != null) {
-        deserializer.release();
-      }
 
-      reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
       reader.init();
-      recordCount = 0;
 
       if (targets == null) {
         targets = schema.toArray();
       }
 
-      outTuple = new VTuple(targets.length);
+      reset();
 
       super.init();
       if (LOG.isDebugEnabled()) {
         LOG.debug("DelimitedTextFileScanner open:" + fragment.getPath() + "," + startOffset + "," + endOffset);
       }
-
-      // skip first line if it reads from middle of file
-      if (startOffset > 0) {
-        reader.readLine();
-      } else { // skip header lines if it is defined
-
-        // initialization for skipping header(max 20)
-        int headerLineNum = Math.min(Integer.parseInt(meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20);
-        if (headerLineNum > 0) {
-          LOG.info(String.format("Skip %d header lines", headerLineNum));
-          for (int i = 0; i < headerLineNum; i++) {
-            if (!reader.isReadable()) {
-              return;
-            }
-
-            reader.readLine();
-          }
-        }
-      }
-
-      deserializer = getLineSerde().createDeserializer(schema, meta, targets);
-      deserializer.init();
     }
 
     public TextLineSerDe getLineSerde() {
@@ -436,7 +405,44 @@ public class DelimitedTextFile {
 
     @Override
     public void reset() throws IOException {
-      init();
+      recordCount = 0;
+
+      if (reader.getReadBytes() > 0) {
+        reader.close();
+
+        reader = new DelimitedLineReader(conf, fragment, conf.getInt(READ_BUFFER_SIZE, 128 * StorageUnit.KB));
+        reader.init();
+      }
+
+      if(deserializer != null) {
+        deserializer.release();
+      }
+
+      deserializer = getLineSerde().createDeserializer(schema, meta, targets);
+      deserializer.init();
+
+      outTuple = new VTuple(targets.length);
+
+      // skip first line if it reads from middle of file
+      if (startOffset > 0) {
+        reader.readLine();
+      } else { // skip header lines if it is defined
+
+        // initialization for skipping header(max 20)
+        int headerLineNum = Math.min(Integer.parseInt(
+            meta.getProperty(StorageConstants.TEXT_SKIP_HEADER_LINE, "0")), 20);
+
+        if (headerLineNum > 0) {
+          LOG.info(String.format("Skip %d header lines", headerLineNum));
+          for (int i = 0; i < headerLineNum; i++) {
+            if (!reader.isReadable()) {
+              return;
+            }
+
+            reader.readLine();
+          }
+        }
+      }
     }
 
     @Override
@@ -446,16 +452,16 @@ public class DelimitedTextFile {
           deserializer.release();
         }
 
-        if (tableStats != null && reader != null) {
+        if (reader != null) {
           tableStats.setReadBytes(reader.getReadBytes());  //Actual Processed Bytes. (decompressed bytes + overhead)
           tableStats.setNumRows(recordCount);
         }
+
         if (LOG.isDebugEnabled()) {
           LOG.debug("DelimitedTextFileScanner processed record:" + recordCount);
         }
       } finally {
         IOUtils.cleanup(LOG, reader);
-        reader = null;
         outTuple = null;
       }
     }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
index 10ac6de..5beba14 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/InternalParquetRecordReader.java
@@ -134,7 +134,7 @@ class InternalParquetRecordReader<T> {
     return currentValue;
   }
 
-  public float getProgress() throws IOException, InterruptedException {
+  public float getProgress() {
     return (float) current / total;
   }
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
index 739686f..c353a81 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/parquet/ParquetReader.java
@@ -143,4 +143,12 @@ public class ParquetReader<T> implements Closeable {
       reader.close();
     }
   }
+
+  public float getProgress() {
+    if (!footersIterator.hasNext()) {
+      return 1.0f;
+    } else {
+      return reader != null ? reader.getProgress() : 0.0f;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/f62d34be/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
index c4514b9..a08dfb9 100644
--- a/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
+++ b/tajo-storage/tajo-storage-hdfs/src/test/java/org/apache/tajo/storage/TestStorages.java
@@ -48,6 +48,7 @@ import org.apache.tajo.storage.sequencefile.SequenceFileScanner;
 import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.JavaResourceUtil;
 import org.apache.tajo.util.KeyValueSet;
+import org.junit.After;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -154,6 +155,11 @@ public class TestStorages {
     });
   }
 
+  @After
+  public void tearDown() throws IOException {
+   fs.delete(testDir, true);
+  }
+
   @Test
   public void testSplitable() throws IOException {
     if (splitable) {
@@ -1303,4 +1309,51 @@ public class TestStorages {
       IOUtils.cleanup(null, appender);
     }
   }
+
+  @Test
+  public void testProgress() throws IOException {
+
+    Schema schema = new Schema();
+    schema.addColumn("col1", Type.FLOAT4);
+    schema.addColumn("col2", Type.FLOAT8);
+    schema.addColumn("col3", Type.INT2);
+    schema.addColumn("col4", Type.INT4);
+    schema.addColumn("col5", Type.INT8);
+
+    KeyValueSet options = new KeyValueSet();
+    TableMeta meta = CatalogUtil.newTableMeta(dataFormat, options);
+    if (dataFormat.equalsIgnoreCase(BuiltinStorages.AVRO)) {
+      meta.putProperty(StorageConstants.AVRO_SCHEMA_LITERAL, TEST_MAX_VALUE_AVRO_SCHEMA);
+    }
+
+    FileTablespace sm = TablespaceManager.getLocalFs();
+    Path tablePath = new Path(testDir, "testProgress.data");
+    Appender appender = sm.getAppender(meta, schema, tablePath);
+    appender.init();
+
+    VTuple tuple = new VTuple(new Datum[]{
+        DatumFactory.createFloat4(Float.MAX_VALUE),
+        DatumFactory.createFloat8(Double.MAX_VALUE),
+        DatumFactory.createInt2(Short.MAX_VALUE),
+        DatumFactory.createInt4(Integer.MAX_VALUE),
+        DatumFactory.createInt8(Long.MAX_VALUE)
+    });
+
+    appender.addTuple(tuple);
+    appender.flush();
+    appender.close();
+
+    FileStatus status = fs.getFileStatus(tablePath);
+    FileFragment fragment = new FileFragment("table", tablePath, 0, status.getLen());
+    Scanner scanner =  sm.getScanner(meta, schema, fragment, null);
+
+    assertEquals(0.0f, scanner.getProgress(), 0.0f);
+
+    scanner.init();
+    assertNotNull(scanner.next());
+    assertNull(null, scanner.next());
+
+    scanner.close();
+    assertEquals(1.0f, scanner.getProgress(), 0.0f);
+  }
 }
\ No newline at end of file