You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/12/03 06:40:31 UTC

tajo git commit: TAJO-2001: DirectRawFileScanner.getProgress occasionally fails.

Repository: tajo
Updated Branches:
  refs/heads/master 3852ae3d0 -> 8d513a5e8


TAJO-2001: DirectRawFileScanner.getProgress occasionally fails.

Closes #894


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/8d513a5e
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/8d513a5e
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/8d513a5e

Branch: refs/heads/master
Commit: 8d513a5e8c8c8d5b69ba9630329736bb2bb7b86e
Parents: 3852ae3
Author: Jinho Kim <jh...@apache.org>
Authored: Thu Dec 3 14:39:46 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Thu Dec 3 14:39:46 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../java/org/apache/tajo/storage/RawFile.java   |  5 ++-
 .../storage/rawfile/DirectRawFileScanner.java   | 46 ++++++++------------
 3 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/8d513a5e/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 688c2f5..9903944 100644
--- a/CHANGES
+++ b/CHANGES
@@ -46,6 +46,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2001: DirectRawFileScanner.getProgress occasionally fails. (jinho)
+
     TAJO-1753: GlobalEngine causes NPE occurs occasionally. (jinho)
 
     TAJO-1862: TSQL gives user wrong URL of documentation. 

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d513a5e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
index 26bd135..f31b85c 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/RawFile.java
@@ -453,10 +453,11 @@ public class RawFile {
         return 1.0f;
       }
 
-      if (filePosition - startOffset == 0) {
+      long readBytes = filePosition - startOffset;
+      if (readBytes == 0) {
         return 0.0f;
       } else {
-        return Math.min(1.0f, ((float) filePosition / endOffset));
+        return Math.min(1.0f, ((float) readBytes / fragment.getLength()));
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/tajo/blob/8d513a5e/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
----------------------------------------------------------------------
diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
index 550de63..1e2380e 100644
--- a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
+++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/rawfile/DirectRawFileScanner.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.tajo.catalog.Schema;
 import org.apache.tajo.catalog.SchemaUtil;
 import org.apache.tajo.catalog.TableMeta;
+import org.apache.tajo.catalog.statistics.TableStats;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.plan.expr.EvalNode;
 import org.apache.tajo.storage.*;
@@ -50,7 +51,6 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
   private SeekableInputChannel channel;
 
   private boolean eos = false;
-  private long fileSize;
   private long recordCount;
   private long filePosition;
   private long endOffset;
@@ -95,10 +95,8 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
       }
 
       channel = new LocalFileInputChannel(new FileInputStream(file));
-      fileSize = channel.size();
     } else {
       channel = new FSDataInputChannel(fs.open(fragment.getPath()));
-      fileSize = channel.size();
     }
 
     // initial set position
@@ -106,10 +104,6 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
       channel.seek(fragment.getStartKey());
     }
 
-    if (tableStats != null) {
-      tableStats.setNumBytes(fileSize);
-    }
-
     filePosition = fragment.getStartKey();
     endOffset = fragment.getStartKey() + fragment.getLength();
     if (LOG.isDebugEnabled()) {
@@ -178,7 +172,7 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
   @Override
   public void close() throws IOException {
     if (tableStats != null) {
-      tableStats.setReadBytes(fileSize);
+      tableStats.setReadBytes(filePosition - fragment.getStartKey());
       tableStats.setNumRows(recordCount);
     }
     if(tupleBuffer != null) {
@@ -211,30 +205,28 @@ public class DirectRawFileScanner extends FileScanner implements SeekableScanner
   }
 
   @Override
+  public TableStats getInputStats() {
+    if(tableStats != null){
+      tableStats.setNumRows(recordCount);
+      tableStats.setReadBytes(filePosition - fragment.getStartKey()); // actual read bytes (scan + rescan * n)
+      tableStats.setNumBytes(fragment.getLength());
+    }
+    return tableStats;
+  }
+
+  @Override
   public float getProgress() {
     if(!inited) return 0.0f;
 
-    try {
-      tableStats.setNumRows(recordCount);
-      long filePos = 0;
-      if (channel != null) {
-        filePos = channel.position();
-        tableStats.setReadBytes(filePos);
-      }
-
-      if(eos || channel == null) {
-        tableStats.setReadBytes(fileSize);
-        return 1.0f;
-      }
+    if(eos) {
+      return 1.0f;
+    }
 
-      if (filePos == 0) {
-        return 0.0f;
-      } else {
-        return Math.min(1.0f, ((float)filePos / (float)fileSize));
-      }
-    } catch (IOException e) {
-      LOG.error(e.getMessage(), e);
+    long readBytes = filePosition - fragment.getStartKey();
+    if (readBytes == 0) {
       return 0.0f;
+    } else {
+      return Math.min(1.0f, ((float) readBytes / fragment.getLength()));
     }
   }
 }