You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/02/16 20:00:37 UTC

[4/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables

Part-3 changes

Changes to ensure the multiple chunk and ranges of hdfs scan
work fine with refactored code.

Pending issues:
Statistics needs to be populated.
ESP should be assigned the ranges in advance to avoid duplicate scans


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/202a040e
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/202a040e
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/202a040e

Branch: refs/heads/master
Commit: 202a040ea9eb8ae3ea79329ee14048c4fe2f082c
Parents: f17e15e
Author: selvaganesang <se...@esgyn.com>
Authored: Fri Feb 2 07:46:48 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Fri Feb 2 07:46:48 2018 +0000

----------------------------------------------------------------------
 core/sql/executor/ExHdfsScan.cpp                | 54 ++++++++----
 core/sql/executor/ExHdfsScan.h                  |  5 +-
 .../main/java/org/trafodion/sql/HDFSClient.java | 86 +++++++-------------
 .../main/java/org/trafodion/sql/HdfsScan.java   | 69 +++++++++-------
 4 files changed, 113 insertions(+), 101 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index e29baf6..730f0dc 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -143,7 +143,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
      }
      bufBegin_ = NULL;
      bufEnd_ = NULL;
-     logicalBufEnd_ = NULL;
+     bufLogicalEnd_ = NULL;
      headRoomCopied_ = 0;
      prevRangeNum_ = -1;
      currRangeBytesRead_ = 0;
@@ -567,11 +567,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
              } 
              bufBegin_ = NULL;
              bufEnd_ = NULL;
-             logicalBufEnd_ = NULL;
+             bufLogicalEnd_ = NULL;
              headRoomCopied_ = 0;
              prevRangeNum_ = -1;                         
              currRangeBytesRead_ = 0;                   
              recordSkip_ = FALSE;
+             extraBytesRead_ = 0;
              step_ = TRAF_HDFS_READ;
           } 
           break;
@@ -589,25 +590,28 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 break;
              } 
              // Assign the starting address of the buffer
+             hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
              bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
-             if (retArray_[IS_EOF])
-                logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
-             else if (retArray_[BYTES_COMPLETED] < hdfsScanBufMaxSize_) 
-                logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - headRoom_;   
-             else
-                logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
-             hdfo_ = getRange(retArray_[RANGE_NO]);
              if (retArray_[RANGE_NO] != prevRangeNum_) {  
+                currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
                 bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
-                if (hdfo_->getStartOffset() == 0)
+                if (hdfo->getStartOffset() == 0)
                    recordSkip_ = FALSE;
                 else
                    recordSkip_ = TRUE; 
              } else {
+                currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
                 bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
                 recordSkip_ = FALSE;
-             } 
+             }
+             if (currRangeBytesRead_ > hdfo->getBytesToRead())
+                extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
+             else
+                extraBytesRead_ = 0;
+             bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_;
+ 
              prevRangeNum_ = retArray_[RANGE_NO];
+             headRoomCopied_ = 0;
              if (recordSkip_) {
 		hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
 			      hdfsScanTdb().recordDelimiter_, 
@@ -628,6 +632,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
           break;
         case COPY_TAIL_TO_HEAD:
           {
+             BYTE *headRoomStartAddr;
+             headRoomCopied_ = bufEnd_ - (BYTE *)hdfsBufNextRow_;
+             if (retArray_[BUF_NO] == 0)
+                headRoomStartAddr = hdfsScanBuf_[1].buf_ - headRoomCopied_;
+             else
+                headRoomStartAddr = hdfsScanBuf_[0].buf_ - headRoomCopied_;
+             memcpy(headRoomStartAddr, hdfsBufNextRow_, headRoomCopied_);
              step_ = TRAF_HDFS_READ;  
           }
           break;
@@ -1023,6 +1034,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 	case PROCESS_HDFS_ROW:
 	{
+          if (!useLibhdfsScan_ && hdfsBufNextRow_ == NULL) {
+             step_ = TRAF_HDFS_READ;
+             break;
+          } 
 	  exception_ = FALSE;
 	  nextStep_ = NOT_STARTED;
 	  debugPenultimatePrevRow_ = debugPrevRow_;
@@ -1066,9 +1081,20 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	  }
 	  else
 	  {
-	    numBytesProcessedInRange_ +=
+            if (useLibhdfsScan_) {
+	       numBytesProcessedInRange_ +=
 	        startOfNextRow - hdfsBufNextRow_;
-	    hdfsBufNextRow_ = startOfNextRow;
+	       hdfsBufNextRow_ = startOfNextRow;
+             } 
+             else {
+                if ((BYTE *)startOfNextRow >= bufLogicalEnd_) {
+                   step_ = TRAF_HDFS_READ;
+                   hdfsBufNextRow_ = NULL;
+                }
+                else
+	          hdfsBufNextRow_ = startOfNextRow;
+             }
+           
 	  }
 
 	  if (exception_)
@@ -1691,7 +1717,7 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
   }
   else {
      sourceDataEnd = (const char *)bufEnd_;
-     endOfRequestedRange = (const char *)logicalBufEnd_;
+     endOfRequestedRange = NULL;
   }
   hdfsLoggingRow_ = hdfsBufNextRow_;
   if (asciiSourceTD->numAttrs() == 0)

http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h
index 62bb11e..2570a58 100644
--- a/core/sql/executor/ExHdfsScan.h
+++ b/core/sql/executor/ExHdfsScan.h
@@ -119,7 +119,7 @@ private:
          a) filename
          b) offset
          c) len
-      Java layer always reads more than the len by rangeTailIOSize_ to accomdate the record split 
+      Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split 
    2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by
       2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the 
       data is always read after the head room. 
@@ -347,11 +347,12 @@ protected:
   int retArray_[4];
   BYTE *bufBegin_;
   BYTE *bufEnd_;
-  BYTE *logicalBufEnd_;
+  BYTE *bufLogicalEnd_;
   long currRangeBytesRead_;
   int headRoomCopied_;
   int headRoom_;
   int prevRangeNum_;
+  int extraBytesRead_;
   NABoolean recordSkip_;
 };
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 1af2c49..3b83c8f 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -87,31 +87,45 @@ public class HDFSClient
 
    class HDFSRead implements Callable 
    {
-      int length_;
-
-      HDFSRead(int length) 
+      HDFSRead() 
       {
-         length_ = length;
       }
  
       public Object call() throws IOException 
       {
          int bytesRead;
-         if (buf_.hasArray())
-            bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_);
-         else
+         int totalBytesRead = 0;
+         if (! buf_.hasArray())
+            fsdis_.seek(pos_);
+         do
          {
-            buf_.limit(bufOffset_ + length_);
-            bytesRead = fsdis_.read(buf_);
-         }
-         return new Integer(bytesRead);
+            if (buf_.hasArray())
+               bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
+            else 
+               bytesRead = fsdis_.read(buf_);
+            if (bytesRead == -1) {
+               isEOF_ = 1;
+               break;
+            }
+            if (bytesRead == 0)
+               break;
+            totalBytesRead += bytesRead;          
+            if (totalBytesRead == bufLen_)
+                break;
+            bufOffset_ += bytesRead;
+            pos_ += bytesRead;
+            lenRemain_ -= bytesRead;
+         } while (lenRemain_ > 0);
+         return new Integer(totalBytesRead);
       }
    }
        
    public HDFSClient() 
    {
    }
- 
+
+   // This constructor enables the hdfs data to be read in another thread while the previously 
+   // read buffer is being processed by the SQL engine 
    public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
    {
       bufNo_ = bufNo; 
@@ -127,44 +141,24 @@ public class HDFSClient
       len_ = length;
       if (buffer.hasArray()) 
          bufLen_ = buffer.array().length;
-      else
-      {
+      else {
          bufLen_ = buffer.capacity();
          buf_.position(0);
       }
       lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
-      if (lenRemain_ != 0)
-      {
-         int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
-         future_ = executorService_.submit(new HDFSRead(readLength));
+      if (lenRemain_ != 0) {
+         future_ = executorService_.submit(new HDFSRead());
       }
    }
 
-   public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException
+   public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
    {
       Integer retObject = 0;
       int bytesRead;
-      int readLength;
-       
-      if (lenRemain_ == 0)
-         return 0;
       retObject = (Integer)future_.get();
       bytesRead = retObject.intValue();
-      if (bytesRead == -1)
-         return -1;
-      bufOffset_ += bytesRead;
-      pos_ += bytesRead;
-      lenRemain_ -= bytesRead;
-      if (bufOffset_ == bufLen_)
-         return bytesRead; 
-      else if (bufOffset_ > bufLen_)
-         throw new IOException("Internal Error in trafHdfsRead ");
-      if (lenRemain_ == 0)
-         return bytesRead; 
-      readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
-      future_ = executorService_.submit(new HDFSRead(readLength));
       return bytesRead;
-   } 
+   }
 
    public int getRangeNo()
    {
@@ -176,24 +170,6 @@ public class HDFSClient
       return isEOF_;
    }
 
-   public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
-   {
-      int bytesRead;
-      int totalBytesRead = 0;
-      while (true) {
-         bytesRead = trafHdfsRead();
-         if (bytesRead == -1) {
-            isEOF_ = 1;
-            return totalBytesRead;
-         }
-         if (bytesRead == 0)
-            return totalBytesRead;
-         totalBytesRead += bytesRead;
-         if (totalBytesRead == bufLen_)
-              return totalBytesRead;
-      }  
-   } 
-
    boolean hdfsCreate(String fname , boolean compress) throws IOException
    {
      if (logger_.isDebugEnabled()) 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 9fb145e..73ceda8 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -65,7 +65,6 @@ public class HdfsScan
    private long lenRemain_;
    private int lastBufCompleted_ = -1;
    private boolean scanCompleted_;
-   private boolean lastScanRangeScheduled_;
    
    class HdfsScanRange 
    {
@@ -113,41 +112,44 @@ public class HdfsScan
       }
       if (hdfsScanRanges_.length > 0) {
          currRange_ = 0;
-         currPos_ = hdfsScanRanges_[0].pos_;
-         lenRemain_ = hdfsScanRanges_[0].len_;
-         hdfsScanRange(0);
+         currPos_ = hdfsScanRanges_[currRange_].pos_;
+         lenRemain_ = hdfsScanRanges_[currRange_].len_; 
+         hdfsScanRange(0, 0);
       }
       scanCompleted_ = false;
-      lastScanRangeScheduled_ = false;
    }
 
-   public void hdfsScanRange(int bufNo) throws IOException
+   public void hdfsScanRange(int bufNo, int bytesCompleted) throws IOException
    {
-      if (logger_.isDebugEnabled())
-         logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); 
+      lenRemain_ -= bytesCompleted;
+      currPos_ += bytesCompleted; 
       int readLength;
-      if (lenRemain_ > bufLen_[bufNo])
-         readLength = bufLen_[bufNo];
-      else
-         readLength = (int)lenRemain_;
-      hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
-      lenRemain_ -= readLength;
-      currPos_ += readLength; 
-      if (lenRemain_ == 0) {
-         if (currRange_  == (hdfsScanRanges_.length-1)) 
-            lastScanRangeScheduled_ = true;
+      if (lenRemain_ <= 0) {
+         if (currRange_  == (hdfsScanRanges_.length-1)) {
+            scanCompleted_ = true;
+            return;
+         }
          else {
             currRange_++;
             currPos_ = hdfsScanRanges_[currRange_].pos_;
             lenRemain_ = hdfsScanRanges_[currRange_].len_; 
          }
       } 
+      if (lenRemain_ > bufLen_[bufNo])
+         readLength = bufLen_[bufNo];
+      else
+         readLength = (int)lenRemain_;
+      if (! scanCompleted_) {
+         if (logger_.isDebugEnabled())
+            logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo); 
+         hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
+      }
    } 
    
    public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException
    {
       int[] retArray;
-      int byteCompleted;
+      int bytesRead;
       int bufNo;
       int rangeNo;
       int isEOF;
@@ -160,44 +162,41 @@ public class HdfsScan
       switch (lastBufCompleted_) {
          case -1:
          case 1:
-            byteCompleted = hdfsClient_[0].trafHdfsReadBuffer(); 
+            bytesRead = hdfsClient_[0].trafHdfsReadBuffer(); 
             bufNo = 0;
             rangeNo = hdfsClient_[0].getRangeNo();
             isEOF = hdfsClient_[0].isEOF();
             break;
          case 0:
-            byteCompleted = hdfsClient_[1].trafHdfsReadBuffer(); 
+            bytesRead = hdfsClient_[1].trafHdfsReadBuffer(); 
             bufNo = 1;
             rangeNo = hdfsClient_[1].getRangeNo();
             isEOF = hdfsClient_[1].isEOF();
             break;
          default:
             bufNo = -1;
-            byteCompleted = -1;
+            bytesRead = -1;
             rangeNo = -1;
             isEOF = 0;
       }    
-      lastBufCompleted_ = bufNo;
-      retArray[0] = byteCompleted;
+      retArray[0] = bytesRead;
       retArray[1] = bufNo;
       retArray[2] = rangeNo; 
       retArray[3] = isEOF;
       if (logger_.isDebugEnabled())
          logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]); 
       lastBufCompleted_ = bufNo;
-      if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) 
-         lastScanRangeScheduled_ = true;
-      if (lastScanRangeScheduled_) {
+      if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) {
          scanCompleted_ = true;
-         return retArray; 
+         return retArray;
       }
       switch (lastBufCompleted_)
       {
          case 0:
-            hdfsScanRange(1);
+            hdfsScanRange(1, bytesRead);
             break;
          case 1:
-            hdfsScanRange(0);
+            hdfsScanRange(0, bytesRead);
             break;            
          default:
             break;
@@ -257,10 +256,20 @@ public class HdfsScan
       hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len);
       int[] retArray;
       int bytesCompleted;
+      ByteBuffer buf;
       while (true) {
          retArray = hdfsScan.trafHdfsRead();
          if (retArray == null)
             break;
+         System.out.println("Range No:" + retArray[2] + " Buf No:" + retArray[1] + " Bytes Completed:" + retArray[0] + " EOF:" + retArray[3]);
+         if (retArray[1] == 0)
+            buf = buf1;
+         else
+            buf = buf2; 
+         buf.position(0);
+         for (int i = 0; i < 50; i++)
+           System.out.print(buf.get());
+         System.out.println("");
       }
       long time2 = System.currentTimeMillis();
       HdfsScan.shutdown();