You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/02/16 20:00:39 UTC

[6/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for trafodion

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for trafodion

With the change hive regressions pass except hive/TEST003 when
cqd use_libhdfs_scan 'off'


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/a187b03b
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/a187b03b
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/a187b03b

Branch: refs/heads/master
Commit: a187b03bc37d62d00278d21eef519496ea6ce1aa
Parents: 7066e3e
Author: selvaganesang <se...@esgyn.com>
Authored: Fri Feb 9 00:56:48 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Fri Feb 9 00:56:48 2018 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHdfsScan.cpp                       |  2 ++
 .../src/main/java/org/trafodion/sql/HDFSClient.java    | 12 ++++++++++--
 core/sql/src/main/java/org/trafodion/sql/HdfsScan.java | 13 ++++++++++---
 3 files changed, 22 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index f8ec9a1..cd95899 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -127,6 +127,8 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
   useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
+  if (isSequenceFile())
+     useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
   hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
   headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 3b83c8f..5c8c487 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.conf.Configuration;
 import java.nio.ByteBuffer;
 import java.io.IOException;
+import java.io.EOFException;
 import java.io.OutputStream;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Future;
@@ -95,8 +96,14 @@ public class HDFSClient
       {
          int bytesRead;
          int totalBytesRead = 0;
-         if (! buf_.hasArray())
-            fsdis_.seek(pos_);
+         if (! buf_.hasArray()) {
+            try {
+              fsdis_.seek(pos_);
+            } catch (EOFException e) {
+              isEOF_ = 1;
+              return new Integer(totalBytesRead);
+            } 
+         }
          do
          {
             if (buf_.hasArray())
@@ -157,6 +164,7 @@ public class HDFSClient
       int bytesRead;
       retObject = (Integer)future_.get();
       bytesRead = retObject.intValue();
+      fsdis_.close();
       return bytesRead;
    }
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/a187b03b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index f3d505d..e216555 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -188,9 +188,16 @@ public class HdfsScan
       if (logger_.isDebugEnabled())
          logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]); 
       lastBufCompleted_ = bufNo;
-      if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) {
-         scanCompleted_ = true;
-         return retArray;
+      if (isEOF == 1) {
+         if (currRange_ == (hdfsScanRanges_.length-1)) {
+            scanCompleted_ = true;
+            return retArray;
+         } else {
+            currRange_++;
+            currPos_ = hdfsScanRanges_[currRange_].pos_;
+            lenRemain_ = hdfsScanRanges_[currRange_].len_;
+            bytesRead = 0;
+         }
       }
       switch (lastBufCompleted_)
       {