You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/02/16 20:00:37 UTC
[4/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion
implementation of hdfs scan for text format hive tables
[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text format hive tables
Part-3 changes
Changes to ensure the multiple chunk and ranges of hdfs scan
work fine with refactored code.
Pending issues:
Statistics needs to be populated.
ESP should be assigned the ranges in advance to avoid duplicate scans
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/202a040e
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/202a040e
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/202a040e
Branch: refs/heads/master
Commit: 202a040ea9eb8ae3ea79329ee14048c4fe2f082c
Parents: f17e15e
Author: selvaganesang <se...@esgyn.com>
Authored: Fri Feb 2 07:46:48 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Fri Feb 2 07:46:48 2018 +0000
----------------------------------------------------------------------
core/sql/executor/ExHdfsScan.cpp | 54 ++++++++----
core/sql/executor/ExHdfsScan.h | 5 +-
.../main/java/org/trafodion/sql/HDFSClient.java | 86 +++++++-------------
.../main/java/org/trafodion/sql/HdfsScan.java | 69 +++++++++-------
4 files changed, 113 insertions(+), 101 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index e29baf6..730f0dc 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -143,7 +143,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
}
bufBegin_ = NULL;
bufEnd_ = NULL;
- logicalBufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
headRoomCopied_ = 0;
prevRangeNum_ = -1;
currRangeBytesRead_ = 0;
@@ -567,11 +567,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
}
bufBegin_ = NULL;
bufEnd_ = NULL;
- logicalBufEnd_ = NULL;
+ bufLogicalEnd_ = NULL;
headRoomCopied_ = 0;
prevRangeNum_ = -1;
currRangeBytesRead_ = 0;
recordSkip_ = FALSE;
+ extraBytesRead_ = 0;
step_ = TRAF_HDFS_READ;
}
break;
@@ -589,25 +590,28 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
break;
}
// Assign the starting address of the buffer
+ hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
- if (retArray_[IS_EOF])
- logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
- else if (retArray_[BYTES_COMPLETED] < hdfsScanBufMaxSize_)
- logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - headRoom_;
- else
- logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
- hdfo_ = getRange(retArray_[RANGE_NO]);
if (retArray_[RANGE_NO] != prevRangeNum_) {
+ currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
- if (hdfo_->getStartOffset() == 0)
+ if (hdfo->getStartOffset() == 0)
recordSkip_ = FALSE;
else
recordSkip_ = TRUE;
} else {
+ currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
recordSkip_ = FALSE;
- }
+ }
+ if (currRangeBytesRead_ > hdfo->getBytesToRead())
+ extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead();
+ else
+ extraBytesRead_ = 0;
+ bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_;
+
prevRangeNum_ = retArray_[RANGE_NO];
+ headRoomCopied_ = 0;
if (recordSkip_) {
hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
hdfsScanTdb().recordDelimiter_,
@@ -628,6 +632,13 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
break;
case COPY_TAIL_TO_HEAD:
{
+ BYTE *headRoomStartAddr;
+ headRoomCopied_ = bufEnd_ - (BYTE *)hdfsBufNextRow_;
+ if (retArray_[BUF_NO] == 0)
+ headRoomStartAddr = hdfsScanBuf_[1].buf_ - headRoomCopied_;
+ else
+ headRoomStartAddr = hdfsScanBuf_[0].buf_ - headRoomCopied_;
+ memcpy(headRoomStartAddr, hdfsBufNextRow_, headRoomCopied_);
step_ = TRAF_HDFS_READ;
}
break;
@@ -1023,6 +1034,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
case PROCESS_HDFS_ROW:
{
+ if (!useLibhdfsScan_ && hdfsBufNextRow_ == NULL) {
+ step_ = TRAF_HDFS_READ;
+ break;
+ }
exception_ = FALSE;
nextStep_ = NOT_STARTED;
debugPenultimatePrevRow_ = debugPrevRow_;
@@ -1066,9 +1081,20 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
}
else
{
- numBytesProcessedInRange_ +=
+ if (useLibhdfsScan_) {
+ numBytesProcessedInRange_ +=
startOfNextRow - hdfsBufNextRow_;
- hdfsBufNextRow_ = startOfNextRow;
+ hdfsBufNextRow_ = startOfNextRow;
+ }
+ else {
+ if ((BYTE *)startOfNextRow >= bufLogicalEnd_) {
+ step_ = TRAF_HDFS_READ;
+ hdfsBufNextRow_ = NULL;
+ }
+ else
+ hdfsBufNextRow_ = startOfNextRow;
+ }
+
}
if (exception_)
@@ -1691,7 +1717,7 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
}
else {
sourceDataEnd = (const char *)bufEnd_;
- endOfRequestedRange = (const char *)logicalBufEnd_;
+ endOfRequestedRange = NULL;
}
hdfsLoggingRow_ = hdfsBufNextRow_;
if (asciiSourceTD->numAttrs() == 0)
http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/executor/ExHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h
index 62bb11e..2570a58 100644
--- a/core/sql/executor/ExHdfsScan.h
+++ b/core/sql/executor/ExHdfsScan.h
@@ -119,7 +119,7 @@ private:
a) filename
b) offset
c) len
- Java layer always reads more than the len by rangeTailIOSize_ to accomdate the record split
+ Java layer always reads more than the len by rangeTailIOSize_ to accommdate the record split
2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by
2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the
data is always read after the head room.
@@ -347,11 +347,12 @@ protected:
int retArray_[4];
BYTE *bufBegin_;
BYTE *bufEnd_;
- BYTE *logicalBufEnd_;
+ BYTE *bufLogicalEnd_;
long currRangeBytesRead_;
int headRoomCopied_;
int headRoom_;
int prevRangeNum_;
+ int extraBytesRead_;
NABoolean recordSkip_;
};
http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 1af2c49..3b83c8f 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -87,31 +87,45 @@ public class HDFSClient
class HDFSRead implements Callable
{
- int length_;
-
- HDFSRead(int length)
+ HDFSRead()
{
- length_ = length;
}
public Object call() throws IOException
{
int bytesRead;
- if (buf_.hasArray())
- bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, length_);
- else
+ int totalBytesRead = 0;
+ if (! buf_.hasArray())
+ fsdis_.seek(pos_);
+ do
{
- buf_.limit(bufOffset_ + length_);
- bytesRead = fsdis_.read(buf_);
- }
- return new Integer(bytesRead);
+ if (buf_.hasArray())
+ bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
+ else
+ bytesRead = fsdis_.read(buf_);
+ if (bytesRead == -1) {
+ isEOF_ = 1;
+ break;
+ }
+ if (bytesRead == 0)
+ break;
+ totalBytesRead += bytesRead;
+ if (totalBytesRead == bufLen_)
+ break;
+ bufOffset_ += bytesRead;
+ pos_ += bytesRead;
+ lenRemain_ -= bytesRead;
+ } while (lenRemain_ > 0);
+ return new Integer(totalBytesRead);
}
}
public HDFSClient()
{
}
-
+
+ // This constructor enables the hdfs data to be read in another thread while the previously
+ // read buffer is being processed by the SQL engine
public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
{
bufNo_ = bufNo;
@@ -127,44 +141,24 @@ public class HDFSClient
len_ = length;
if (buffer.hasArray())
bufLen_ = buffer.array().length;
- else
- {
+ else {
bufLen_ = buffer.capacity();
buf_.position(0);
}
lenRemain_ = (len_ > bufLen_) ? bufLen_ : len_;
- if (lenRemain_ != 0)
- {
- int readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
- future_ = executorService_.submit(new HDFSRead(readLength));
+ if (lenRemain_ != 0) {
+ future_ = executorService_.submit(new HDFSRead());
}
}
- public int trafHdfsRead() throws IOException, InterruptedException, ExecutionException
+ public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
{
Integer retObject = 0;
int bytesRead;
- int readLength;
-
- if (lenRemain_ == 0)
- return 0;
retObject = (Integer)future_.get();
bytesRead = retObject.intValue();
- if (bytesRead == -1)
- return -1;
- bufOffset_ += bytesRead;
- pos_ += bytesRead;
- lenRemain_ -= bytesRead;
- if (bufOffset_ == bufLen_)
- return bytesRead;
- else if (bufOffset_ > bufLen_)
- throw new IOException("Internal Error in trafHdfsRead ");
- if (lenRemain_ == 0)
- return bytesRead;
- readLength = (lenRemain_ > blockSize_) ? blockSize_ : lenRemain_;
- future_ = executorService_.submit(new HDFSRead(readLength));
return bytesRead;
- }
+ }
public int getRangeNo()
{
@@ -176,24 +170,6 @@ public class HDFSClient
return isEOF_;
}
- public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
- {
- int bytesRead;
- int totalBytesRead = 0;
- while (true) {
- bytesRead = trafHdfsRead();
- if (bytesRead == -1) {
- isEOF_ = 1;
- return totalBytesRead;
- }
- if (bytesRead == 0)
- return totalBytesRead;
- totalBytesRead += bytesRead;
- if (totalBytesRead == bufLen_)
- return totalBytesRead;
- }
- }
-
boolean hdfsCreate(String fname , boolean compress) throws IOException
{
if (logger_.isDebugEnabled())
http://git-wip-us.apache.org/repos/asf/trafodion/blob/202a040e/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 9fb145e..73ceda8 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -65,7 +65,6 @@ public class HdfsScan
private long lenRemain_;
private int lastBufCompleted_ = -1;
private boolean scanCompleted_;
- private boolean lastScanRangeScheduled_;
class HdfsScanRange
{
@@ -113,41 +112,44 @@ public class HdfsScan
}
if (hdfsScanRanges_.length > 0) {
currRange_ = 0;
- currPos_ = hdfsScanRanges_[0].pos_;
- lenRemain_ = hdfsScanRanges_[0].len_;
- hdfsScanRange(0);
+ currPos_ = hdfsScanRanges_[currRange_].pos_;
+ lenRemain_ = hdfsScanRanges_[currRange_].len_;
+ hdfsScanRange(0, 0);
}
scanCompleted_ = false;
- lastScanRangeScheduled_ = false;
}
- public void hdfsScanRange(int bufNo) throws IOException
+ public void hdfsScanRange(int bufNo, int bytesCompleted) throws IOException
{
- if (logger_.isDebugEnabled())
- logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo);
+ lenRemain_ -= bytesCompleted;
+ currPos_ += bytesCompleted;
int readLength;
- if (lenRemain_ > bufLen_[bufNo])
- readLength = bufLen_[bufNo];
- else
- readLength = (int)lenRemain_;
- hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
- lenRemain_ -= readLength;
- currPos_ += readLength;
- if (lenRemain_ == 0) {
- if (currRange_ == (hdfsScanRanges_.length-1))
- lastScanRangeScheduled_ = true;
+ if (lenRemain_ <= 0) {
+ if (currRange_ == (hdfsScanRanges_.length-1)) {
+ scanCompleted_ = true;
+ return;
+ }
else {
currRange_++;
currPos_ = hdfsScanRanges_[currRange_].pos_;
lenRemain_ = hdfsScanRanges_[currRange_].len_;
}
}
+ if (lenRemain_ > bufLen_[bufNo])
+ readLength = bufLen_[bufNo];
+ else
+ readLength = (int)lenRemain_;
+ if (! scanCompleted_) {
+ if (logger_.isDebugEnabled())
+ logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo);
+ hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
+ }
}
public int[] trafHdfsRead() throws IOException, InterruptedException, ExecutionException
{
int[] retArray;
- int byteCompleted;
+ int bytesRead;
int bufNo;
int rangeNo;
int isEOF;
@@ -160,44 +162,41 @@ public class HdfsScan
switch (lastBufCompleted_) {
case -1:
case 1:
- byteCompleted = hdfsClient_[0].trafHdfsReadBuffer();
+ bytesRead = hdfsClient_[0].trafHdfsReadBuffer();
bufNo = 0;
rangeNo = hdfsClient_[0].getRangeNo();
isEOF = hdfsClient_[0].isEOF();
break;
case 0:
- byteCompleted = hdfsClient_[1].trafHdfsReadBuffer();
+ bytesRead = hdfsClient_[1].trafHdfsReadBuffer();
bufNo = 1;
rangeNo = hdfsClient_[1].getRangeNo();
isEOF = hdfsClient_[1].isEOF();
break;
default:
bufNo = -1;
- byteCompleted = -1;
+ bytesRead = -1;
rangeNo = -1;
isEOF = 0;
}
- lastBufCompleted_ = bufNo;
- retArray[0] = byteCompleted;
+ retArray[0] = bytesRead;
retArray[1] = bufNo;
retArray[2] = rangeNo;
retArray[3] = isEOF;
if (logger_.isDebugEnabled())
logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]);
lastBufCompleted_ = bufNo;
- if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1)))
- lastScanRangeScheduled_ = true;
- if (lastScanRangeScheduled_) {
+ if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1))) {
scanCompleted_ = true;
- return retArray;
+ return retArray;
}
switch (lastBufCompleted_)
{
case 0:
- hdfsScanRange(1);
+ hdfsScanRange(1, bytesRead);
break;
case 1:
- hdfsScanRange(0);
+ hdfsScanRange(0, bytesRead);
break;
default:
break;
@@ -257,10 +256,20 @@ public class HdfsScan
hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len);
int[] retArray;
int bytesCompleted;
+ ByteBuffer buf;
while (true) {
retArray = hdfsScan.trafHdfsRead();
if (retArray == null)
break;
+ System.out.println("Range No:" + retArray[2] + " Buf No:" + retArray[1] + " Bytes Completed:" + retArray[0] + " EOF:" + retArray[3]);
+ if (retArray[1] == 0)
+ buf = buf1;
+ else
+ buf = buf2;
+ buf.position(0);
+ for (int i = 0; i < 50; i++)
+ System.out.print(buf.get());
+ System.out.println("");
}
long time2 = System.currentTimeMillis();
HdfsScan.shutdown();