You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/02/16 20:00:36 UTC
[3/9] trafodion git commit: [TRAFODION-2917] Refactor Trafodion
implementation of hdfs scan for text formatted hive tables
[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables
Part-2 changes
Introduced a new CQD USE_LIBHDFS_SCAN 'OFF' to switch to the new implementation
The new implementation details are at executor/ExHdfsScan.h
Fixed a bug that was causing unexpected errors in JVM when the JNI object corresponding
to java class is not initialized correctly
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/f17e15ee
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/f17e15ee
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/f17e15ee
Branch: refs/heads/master
Commit: f17e15eed741a40a41eec6a1a206dd661589623c
Parents: 60db153
Author: selvaganesang <se...@esgyn.com>
Authored: Wed Jan 31 19:48:49 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Wed Jan 31 19:48:49 2018 +0000
----------------------------------------------------------------------
core/sql/comexe/ComTdbHdfsScan.h | 10 +-
core/sql/executor/ExExeUtilGet.cpp | 16 +-
core/sql/executor/ExExeUtilLoad.cpp | 10 +-
core/sql/executor/ExFastTransport.cpp | 2 -
core/sql/executor/ExFastTransport.h | 2 +-
core/sql/executor/ExHbaseAccess.cpp | 10 +-
core/sql/executor/ExHbaseIUD.cpp | 5 -
core/sql/executor/ExHdfsScan.cpp | 185 ++++++++++++++---
core/sql/executor/ExHdfsScan.h | 56 ++++++
core/sql/executor/HBaseClient_JNI.cpp | 21 +-
core/sql/executor/HBaseClient_JNI.h | 12 +-
core/sql/executor/HdfsClient_JNI.cpp | 198 +++++++++++++++----
core/sql/executor/HdfsClient_JNI.h | 28 +--
core/sql/executor/JavaObjectInterface.cpp | 14 +-
core/sql/executor/JavaObjectInterface.h | 24 +--
core/sql/executor/OrcFileReader.cpp | 3 +-
core/sql/executor/SequenceFileReader.cpp | 6 +-
core/sql/exp/ExpErrorEnums.h | 1 +
core/sql/exp/ExpHbaseInterface.cpp | 20 +-
core/sql/exp/ExpHbaseInterface.h | 12 +-
core/sql/exp/ExpLOBinterface.h | 13 --
core/sql/generator/GenRelScan.cpp | 3 +
core/sql/qmscommon/QRLogger.cpp | 1 +
core/sql/qmscommon/QRLogger.h | 1 +
core/sql/sqlcomp/DefaultConstants.h | 3 +
core/sql/sqlcomp/nadefaults.cpp | 2 +
.../main/java/org/trafodion/sql/HDFSClient.java | 22 ++-
.../main/java/org/trafodion/sql/HdfsScan.java | 35 +++-
28 files changed, 511 insertions(+), 204 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index 1d65bca..ff692c9 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -24,7 +24,7 @@
#define COM_HDFS_SCAN_H
#include "ComTdb.h"
-//#include "hdfs.h" // tPort
+//#include "hdfs.h"
#include "ExpLOBinterface.h"
#include "ComQueue.h"
@@ -54,7 +54,8 @@ class ComTdbHdfsScan : public ComTdb
// ignore conversion errors and continue reading the next row.
CONTINUE_ON_ERROR = 0x0020,
LOG_ERROR_ROWS = 0x0040,
- ASSIGN_RANGES_AT_RUNTIME = 0x0080
+ ASSIGN_RANGES_AT_RUNTIME = 0x0080,
+ USE_LIBHDFS_SCAN = 0x0100
};
// Expression to filter rows.
@@ -284,6 +285,11 @@ public:
{(v ? flags_ |= ASSIGN_RANGES_AT_RUNTIME : flags_ &= ~ASSIGN_RANGES_AT_RUNTIME); }
NABoolean getAssignRangesAtRuntime() const
{ return (flags_ & ASSIGN_RANGES_AT_RUNTIME) != 0; }
+
+ void setUseLibhdfsScan(NABoolean v)
+ {(v ? flags_ |= USE_LIBHDFS_SCAN : flags_ &= ~USE_LIBHDFS_SCAN); }
+ NABoolean getUseLibhdfsScan() const
+ { return (flags_ & USE_LIBHDFS_SCAN) != 0; }
UInt32 getMaxErrorRows() const { return maxErrorRows_;}
void setMaxErrorRows(UInt32 v ) { maxErrorRows_= v; }
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExExeUtilGet.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilGet.cpp b/core/sql/executor/ExExeUtilGet.cpp
index 6d675cb..539a8cf 100644
--- a/core/sql/executor/ExExeUtilGet.cpp
+++ b/core/sql/executor/ExExeUtilGet.cpp
@@ -3521,13 +3521,9 @@ ExExeUtilGetHbaseObjectsTcb::ExExeUtilGetHbaseObjectsTcb(
ex_globals * glob)
: ExExeUtilGetMetadataInfoTcb( exe_util_tdb, glob)
{
- int jniDebugPort = 0;
- int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)exe_util_tdb.server(),
- (char*)exe_util_tdb.zkPort(),
- jniDebugPort,
- jniDebugTimeout);
+ (char*)exe_util_tdb.zkPort());
hbaseName_ = NULL;
hbaseNameBuf_ = new(getGlobals()->getDefaultHeap())
@@ -6106,9 +6102,7 @@ ExExeUtilRegionStatsTcb::ExExeUtilRegionStatsTcb(
int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)"", //exe_util_tdb.server(),
- (char*)"", //exe_util_tdb.zkPort(),
- jniDebugPort,
- jniDebugTimeout);
+ (char*)""); //exe_util_tdb.zkPort(),
regionInfoList_ = NULL;
@@ -6879,13 +6873,9 @@ ExExeUtilClusterStatsTcb::ExExeUtilClusterStatsTcb(
stats_ = (ComTdbClusterStatsVirtTableColumnStruct*)statsBuf_;
- int jniDebugPort = 0;
- int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)"", //exe_util_tdb.server(),
- (char*)"", //exe_util_tdb.zkPort(),
- jniDebugPort,
- jniDebugTimeout);
+ (char*)""); //exe_util_tdb.zkPort());
regionInfoList_ = NULL;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExExeUtilLoad.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp
index 819b3b1..0ebc65c 100644
--- a/core/sql/executor/ExExeUtilLoad.cpp
+++ b/core/sql/executor/ExExeUtilLoad.cpp
@@ -1245,9 +1245,7 @@ short ExExeUtilHBaseBulkLoadTcb::work()
int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(),
(char*)"", //Later may need to change to hblTdb.server_,
- (char*)"", //Later may need to change to hblTdb.zkPort_,
- jniDebugPort,
- jniDebugTimeout);
+ (char*)""); //Later may need to change to hblTdb.zkPort_);
retcode = ehi_->initHBLC();
if (retcode == 0)
retcode = ehi_->createCounterTable(hblTdb().getErrCountTable(), (char *)"ERRORS");
@@ -1983,13 +1981,9 @@ ExExeUtilHBaseBulkUnLoadTcb::ExExeUtilHBaseBulkUnLoadTcb(
oneFile_(FALSE)
{
hdfsClient_ = NULL;
- int jniDebugPort = 0;
- int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(),
(char*)"", //Later may need to change to hblTdb.server_,
- (char*)"", //Later may need to change to hblTdb.zkPort_,
- jniDebugPort,
- jniDebugTimeout);
+ (char*)""); //Later may need to change to hblTdb.zkPort_);
qparent_.down->allocatePstate(this);
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExFastTransport.cpp b/core/sql/executor/ExFastTransport.cpp
index bdde201..3a26467 100644
--- a/core/sql/executor/ExFastTransport.cpp
+++ b/core/sql/executor/ExFastTransport.cpp
@@ -1291,8 +1291,6 @@ void ExHdfsFastExtractTcb::createHdfsClientFileError(Int32 hdfsClientRetCode)
NULL, NULL, NULL, NULL,
errorMsg,
(char *)currContext->getJniErrorStr().data());
- //ex_queue_entry *pentry_down = qParent_.down->getHeadEntry();
- //pentry_down->setDiagsArea(diagsArea);
updateWorkATPDiagsArea(diagsArea);
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExFastTransport.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExFastTransport.h b/core/sql/executor/ExFastTransport.h
index 94b091d..5bf1219 100644
--- a/core/sql/executor/ExFastTransport.h
+++ b/core/sql/executor/ExFastTransport.h
@@ -408,7 +408,7 @@ protected:
NABoolean isSequenceFile();
void createSequenceFileError(Int32 sfwRetCode);
- void createHdfsClientFileError(Int32 sfwRetCode);
+ void createHdfsClientFileError(Int32 hdfsClientRetCode);
NABoolean isHdfsCompressed();
NABoolean getEmptyNullString()
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHbaseAccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp
index 42fd86e..2247b9a 100644
--- a/core/sql/executor/ExHbaseAccess.cpp
+++ b/core/sql/executor/ExHbaseAccess.cpp
@@ -358,15 +358,9 @@ ExHbaseAccessTcb::ExHbaseAccessTcb(
registerSubtasks();
registerResizeSubtasks();
- int jniDebugPort = 0;
- int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
- // (char*)"localhost",
(char*)hbaseAccessTdb.server_,
- // (char*)"2181",
- (char*)hbaseAccessTdb.zkPort_,
- jniDebugPort,
- jniDebugTimeout);
+ (char*)hbaseAccessTdb.zkPort_);
asciiRow_ = NULL;
asciiRowMissingCols_ = NULL;
@@ -508,6 +502,8 @@ void ExHbaseAccessTcb::freeResources()
NADELETEBASIC(colVal_.val, getHeap());
if (hdfsClient_ != NULL)
NADELETE(hdfsClient_, HdfsClient, getHeap());
+ if (loggingFileName_ != NULL)
+ NADELETEBASIC(loggingFileName_, getHeap());
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index 3bc1d93..e8896b2 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -1158,16 +1158,11 @@ ExHbaseAccessBulkLoadPrepSQTcb::ExHbaseAccessBulkLoadPrepSQTcb(
"traf_upsert_err",
fileNum,
loggingFileName_);
- loggingFileCreated_ = FALSE;
loggingRow_ = new(glob->getDefaultHeap()) char[hbaseAccessTdb.updateRowLen_];
}
ExHbaseAccessBulkLoadPrepSQTcb::~ExHbaseAccessBulkLoadPrepSQTcb()
{
- if (loggingFileName_ != NULL) {
- NADELETEBASIC(loggingFileName_, getHeap());
- loggingFileName_ = NULL;
- }
// Flush and close sample file if used
if (hdfs_)
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 90ac737..e29baf6 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -120,15 +120,35 @@ ExHdfsScanTcb::ExHdfsScanTcb(
, loggingErrorDiags_(NULL)
, loggingFileName_(NULL)
, hdfsClient_(NULL)
+ , hdfsScan_(NULL)
, hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries())
{
Space * space = (glob ? glob->getSpace() : 0);
CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
+ useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
lobGlob_ = NULL;
- const int readBufSize = (Int32)hdfsScanTdb.hdfsBufSize_;
- hdfsScanBuffer_ = new(space) char[ readBufSize + 1 ];
- hdfsScanBuffer_[readBufSize] = '\0';
-
+ hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
+ headRoom_ = (Int32)hdfsScanTdb.rangeTailIOSize_;
+
+ if (useLibhdfsScan_) {
+ hdfsScanBuffer_ = new(heap) char[ hdfsScanBufMaxSize_ + 1 ];
+ hdfsScanBuffer_[hdfsScanBufMaxSize_] = '\0';
+ } else {
+ hdfsScanBufBacking_[0] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)];
+ hdfsScanBufBacking_[1] = new (heap) BYTE[hdfsScanBufMaxSize_ + 2 * (headRoom_)];
+ for (int i=0; i < 2; i++) {
+ BYTE *hdfsScanBufBacking = hdfsScanBufBacking_[i];
+ hdfsScanBuf_[i].headRoom_ = hdfsScanBufBacking;
+ hdfsScanBuf_[i].buf_ = hdfsScanBufBacking + headRoom_;
+ }
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ logicalBufEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1;
+ currRangeBytesRead_ = 0;
+ recordSkip_ = FALSE;
+ }
moveExprColsBuffer_ = new(space) ExSimpleSQLBuffer( 1, // one row
(Int32)hdfsScanTdb.moveExprColsRowLength_,
space);
@@ -202,9 +222,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
int jniDebugTimeout = 0;
ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
(char*)"", //Later replace with server cqd
- (char*)"", ////Later replace with port cqd
- jniDebugPort,
- jniDebugTimeout);
+ (char*)"");
// Populate the hdfsInfo list into an array to gain o(1) lookup access
Queue* hdfsInfoList = hdfsScanTdb.getHdfsFileInfoList();
@@ -238,9 +256,9 @@ void ExHdfsScanTcb::freeResources()
deallocateAtp(workAtp_, getSpace());
workAtp_ = NULL;
}
- if (hdfsScanBuffer_)
+ if (hdfsScanBuffer_ )
{
- NADELETEBASIC(hdfsScanBuffer_, getSpace());
+ NADELETEBASIC(hdfsScanBuffer_, getHeap());
hdfsScanBuffer_ = NULL;
}
if (hdfsAsciiSourceBuffer_)
@@ -287,6 +305,8 @@ void ExHdfsScanTcb::freeResources()
}
if (hdfsClient_ != NULL)
NADELETE(hdfsClient_, HdfsClient, getHeap());
+ if (hdfsScan_ != NULL)
+ NADELETE(hdfsScan_, HdfsScan, getHeap());
}
NABoolean ExHdfsScanTcb::needStatsEntry()
@@ -384,10 +404,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
HdfsFileInfo *hdfo = NULL;
Lng32 openType = 0;
int changedLen = 0;
- ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
- hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
- hdfsFileInfo *dirInfo = NULL;
- Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call.
+ ContextCli *currContext = getGlobals()->castToExExeStmtGlobals()->getCliGlobals()->currContext();
+ hdfsFS hdfs = currContext->getHdfsServerConnection(hdfsScanTdb().hostName_,hdfsScanTdb().port_);
+ hdfsFileInfo *dirInfo = NULL;
+ Int32 hdfsErrorDetail = 0;//this is errno returned form underlying hdfsOpenFile call.
+ HDFS_Scan_RetCode hdfsScanRetCode;
+
while (!qparent_.down->isEmpty())
{
ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
@@ -442,8 +464,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
case ASSIGN_RANGES_AT_RUNTIME:
computeRangesAtRuntime();
currRangeNum_ = beginRangeNum_;
- if (numRanges_ > 0)
- step_ = INIT_HDFS_CURSOR;
+ if (numRanges_ > 0) {
+ if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+ else
+ step_ = SETUP_HDFS_SCAN;
+ }
else
step_ = DONE;
break;
@@ -518,11 +544,93 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (step_ == CHECK_FOR_DATA_MOD_AND_DONE)
step_ = DONE;
- else
- step_ = INIT_HDFS_CURSOR;
+ else {
+ if (useLibhdfsScan_)
+ step_ = INIT_HDFS_CURSOR;
+ else
+ step_ = SETUP_HDFS_SCAN;
+ }
}
break;
-
+ case SETUP_HDFS_SCAN:
+ {
+ if (hdfsScan_ != NULL)
+ NADELETE(hdfsScan_, HdfsScan, getHeap());
+ hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_,
+ &hdfsFileInfoListAsArray_, hdfsScanTdb().rangeTailIOSize_, hdfsScanRetCode);
+ if (hdfsScanRetCode != HDFS_SCAN_OK)
+ {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ bufBegin_ = NULL;
+ bufEnd_ = NULL;
+ logicalBufEnd_ = NULL;
+ headRoomCopied_ = 0;
+ prevRangeNum_ = -1;
+ currRangeBytesRead_ = 0;
+ recordSkip_ = FALSE;
+ step_ = TRAF_HDFS_READ;
+ }
+ break;
+ case TRAF_HDFS_READ:
+ {
+ hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), retArray_, sizeof(retArray_)/sizeof(int));
+ if (hdfsScanRetCode == HDFS_SCAN_EOR) {
+ step_ = DONE;
+ break;
+ }
+ else if (hdfsScanRetCode != HDFS_SCAN_OK) {
+ setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN",
+ currContext->getJniErrorStr(), NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ // Assign the starting address of the buffer
+ bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
+ if (retArray_[IS_EOF])
+ logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
+ else if (retArray_[BYTES_COMPLETED] < hdfsScanBufMaxSize_)
+ logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - headRoom_;
+ else
+ logicalBufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
+ hdfo_ = getRange(retArray_[RANGE_NO]);
+ if (retArray_[RANGE_NO] != prevRangeNum_) {
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_;
+ if (hdfo_->getStartOffset() == 0)
+ recordSkip_ = FALSE;
+ else
+ recordSkip_ = TRUE;
+ } else {
+ bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
+ recordSkip_ = FALSE;
+ }
+ prevRangeNum_ = retArray_[RANGE_NO];
+ if (recordSkip_) {
+ hdfsBufNextRow_ = hdfs_strchr((char *)bufBegin_,
+ hdfsScanTdb().recordDelimiter_,
+ (char *)bufEnd_,
+ checkRangeDelimiter_,
+ hdfsScanTdb().getHiveScanMode(), &changedLen);
+ if (hdfsBufNextRow_ == NULL) {
+ setupError(8446, 0, "No record delimiter found in buffer from hdfsRead",
+ NULL, NULL);
+ step_ = HANDLE_ERROR_AND_DONE;
+ break;
+ }
+ }
+ else
+ hdfsBufNextRow_ = (char *)bufBegin_;
+ step_ = PROCESS_HDFS_ROW;
+ }
+ break;
+ case COPY_TAIL_TO_HEAD:
+ {
+ step_ = TRAF_HDFS_READ;
+ }
+ break;
case INIT_HDFS_CURSOR:
{
hdfo_ = getRange(currRangeNum_);
@@ -949,7 +1057,10 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
if (startOfNextRow == NULL)
{
- step_ = REPOS_HDFS_DATA;
+ if (useLibhdfsScan_)
+ step_ = REPOS_HDFS_DATA;
+ else
+ step_ = COPY_TAIL_TO_HEAD;
if (!exception_)
break;
}
@@ -1220,8 +1331,12 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
workAtp_->setDiagsArea(NULL); // get rid of warnings.
if (((pentry_down->downState.request == ex_queue::GET_N) &&
(pentry_down->downState.requestValue == matches_)) ||
- (pentry_down->downState.request == ex_queue::GET_NOMORE))
- step_ = CLOSE_HDFS_CURSOR;
+ (pentry_down->downState.request == ex_queue::GET_NOMORE)) {
+ if (useLibhdfsScan_)
+ step_ = CLOSE_HDFS_CURSOR;
+ else
+ step_ = DONE;
+ }
else
step_ = PROCESS_HDFS_ROW;
break;
@@ -1568,18 +1683,26 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
const char cd = hdfsScanTdb().columnDelimiter_;
const char rd = hdfsScanTdb().recordDelimiter_;
- const char *sourceDataEnd = hdfsScanBuffer_+trailingPrevRead_+ bytesRead_;
-
+ const char *sourceDataEnd;
+ const char *endOfRequestedRange;
+ if (useLibhdfsScan_) {
+ sourceDataEnd = hdfsScanBuffer_+trailingPrevRead_+ bytesRead_;
+ endOfRequestedRange = endOfRequestedRange_;
+ }
+ else {
+ sourceDataEnd = (const char *)bufEnd_;
+ endOfRequestedRange = (const char *)logicalBufEnd_;
+ }
hdfsLoggingRow_ = hdfsBufNextRow_;
if (asciiSourceTD->numAttrs() == 0)
{
sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_, mode, &changedLen);
hdfsLoggingRowEnd_ = sourceRowEnd + changedLen;
- if (!sourceRowEnd)
- return NULL;
- if ((endOfRequestedRange_) &&
- (sourceRowEnd >= endOfRequestedRange_)) {
+ if (sourceRowEnd == NULL)
+ return NULL;
+ if ((endOfRequestedRange) &&
+ (sourceRowEnd >= endOfRequestedRange)) {
checkRangeDelimiter_ = TRUE;
*(sourceRowEnd +1)= RANGE_DELIMITER;
}
@@ -1623,8 +1746,8 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
if (rdSeen) {
sourceRowEnd = sourceColEnd + changedLen;
hdfsLoggingRowEnd_ = sourceRowEnd;
- if ((endOfRequestedRange_) &&
- (sourceRowEnd >= endOfRequestedRange_)) {
+ if ((endOfRequestedRange) &&
+ (sourceRowEnd >= endOfRequestedRange)) {
checkRangeDelimiter_ = TRUE;
*(sourceRowEnd +1)= RANGE_DELIMITER;
}
@@ -1697,8 +1820,8 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_,mode, &changedLen);
if (sourceRowEnd) {
hdfsLoggingRowEnd_ = sourceRowEnd + changedLen; //changedLen is when hdfs_strchr move the return pointer to remove the extra \r
- if ((endOfRequestedRange_) &&
- (sourceRowEnd >= endOfRequestedRange_ )) {
+ if ((endOfRequestedRange) &&
+ (sourceRowEnd >= endOfRequestedRange )) {
checkRangeDelimiter_ = TRUE;
*(sourceRowEnd +1)= RANGE_DELIMITER;
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/ExHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h
index 984fbb9..62bb11e 100644
--- a/core/sql/executor/ExHdfsScan.h
+++ b/core/sql/executor/ExHdfsScan.h
@@ -46,6 +46,8 @@
// -----------------------------------------------------------------------
class ExHdfsScanTdb;
class ExHdfsScanTcb;
+class HdfsScan;
+class HdfsClient;
// -----------------------------------------------------------------------
// Classes referenced in this file
@@ -108,9 +110,46 @@ private:
// ---------------------------------------------------------------------
};
+/*
+ USE_LIBHDFS_SCAN - OFF enables hdfs access via java classes
+ org.trafodion.sql.HdfsScan and org.trafodion.sql.HdfsClient
+ Steps involved:
+ 1. Create a new HdfsScan object and set the scan ranges of the fragment instance in it
+ The scan range involves the following and it is determined either at runtime or compile time
+ a) filename
+ b) offset
+ c) len
+ Java layer always reads more than the len by rangeTailIOSize_ to accomdate the record split
+ 2. Two ByteBuffer objects are also passsed to HdfsScan object. These ByteBuffers are backed up by
+ 2 native buffers where the data is fetched. The buffer has a head room of size rangeTailIOSize_ and the
+ data is always read after the head room.
+ 3. HdfsScan returns an int array containing bytesRead, bufNo, rangeNo, isEOF and schedules either
+ the remaining bytes to be read or the next range using ByteBuffers alternatively.
+ 4. HdfsScan returns null array when there is no more data to be read.
+ 5. When the data is processed in one ByteBuffer in the native thread, the data is fetched into the other ByteBuffer by
+ another Java thread.
+ 6. Native layer after processing all the rows in one ByteBuffer, moves the last incomplete row to head room of the
+ other ByteBuffer. Then it requests to check if the read is complete. The native layer processes the buffer starting
+ from the copied incomplete row.
+*/
+
class ExHdfsScanTcb : public ex_tcb
{
+
public:
+ enum
+ {
+ BYTES_COMPLETED,
+ BUF_NO,
+ RANGE_NO,
+ IS_EOF
+ } retArrayIndices_;
+
+ struct HDFS_SCAN_BUF
+ {
+ BYTE *headRoom_;
+ BYTE *buf_;
+ };
ExHdfsScanTcb( const ComTdbHdfsScan &tdb,
ex_globals *glob );
@@ -165,6 +204,9 @@ protected:
, DONE
, HANDLE_ERROR_WITH_CLOSE
, HANDLE_ERROR_AND_DONE
+ , SETUP_HDFS_SCAN
+ , TRAF_HDFS_READ
+ , COPY_TAIL_TO_HEAD
} step_,nextStep_;
/////////////////////////////////////////////////////
@@ -296,7 +338,21 @@ protected:
// this array is populated from the info list stored as Queue.
HdfsFileInfoArray hdfsFileInfoListAsArray_;
+
HdfsClient *hdfsClient_;
+ HdfsScan *hdfsScan_;
+ NABoolean useLibhdfsScan_;
+ BYTE *hdfsScanBufBacking_[2];
+ HDFS_SCAN_BUF hdfsScanBuf_[2];
+ int retArray_[4];
+ BYTE *bufBegin_;
+ BYTE *bufEnd_;
+ BYTE *logicalBufEnd_;
+ long currRangeBytesRead_;
+ int headRoomCopied_;
+ int headRoom_;
+ int prevRangeNum_;
+ NABoolean recordSkip_;
};
class ExOrcScanTcb : public ExHdfsScanTcb
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HBaseClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HBaseClient_JNI.cpp b/core/sql/executor/HBaseClient_JNI.cpp
index fe56d94..6b400cd 100644
--- a/core/sql/executor/HBaseClient_JNI.cpp
+++ b/core/sql/executor/HBaseClient_JNI.cpp
@@ -114,8 +114,8 @@ static const char* const hbcErrorEnumStr[] =
//
//////////////////////////////////////////////////////////////////////////////
// private default constructor
-HBaseClient_JNI::HBaseClient_JNI(NAHeap *heap, int debugPort, int debugTimeout)
- : JavaObjectInterface(heap, debugPort, debugTimeout)
+HBaseClient_JNI::HBaseClient_JNI(NAHeap *heap)
+ : JavaObjectInterface(heap)
,isConnected_(FALSE)
{
for (int i=0; i<NUM_HBASE_WORKER_THREADS; i++) {
@@ -137,7 +137,7 @@ char* HBaseClient_JNI::getErrorText(HBC_RetCode errEnum)
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
-HBaseClient_JNI* HBaseClient_JNI::getInstance(int debugPort, int debugTimeout)
+HBaseClient_JNI* HBaseClient_JNI::getInstance()
{
ContextCli *currContext = GetCliGlobals()->currContext();
HBaseClient_JNI *hbaseClient_JNI = currContext->getHBaseClient();
@@ -145,8 +145,7 @@ HBaseClient_JNI* HBaseClient_JNI::getInstance(int debugPort, int debugTimeout)
{
NAHeap *heap = currContext->exHeap();
- hbaseClient_JNI = new (heap) HBaseClient_JNI(heap,
- debugPort, debugTimeout);
+ hbaseClient_JNI = new (heap) HBaseClient_JNI(heap);
currContext->setHbaseClient(hbaseClient_JNI);
}
return hbaseClient_JNI;
@@ -301,7 +300,8 @@ HBC_RetCode HBaseClient_JNI::init()
JavaMethods_[JM_TRUNCATE ].jm_name = "truncate";
JavaMethods_[JM_TRUNCATE ].jm_signature = "(Ljava/lang/String;ZJ)Z";
rc = (HBC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HBC_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -1583,7 +1583,8 @@ HBLC_RetCode HBulkLoadClient_JNI::init()
JavaMethods_[JM_ADD_TO_HFILE_DB ].jm_signature = "(SLjava/lang/Object;Ljava/lang/Object;)Z";
rc = (HBLC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HBLC_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -3208,7 +3209,8 @@ HTC_RetCode HTableClient_JNI::init()
JavaMethods_[JM_COMPLETE_PUT ].jm_signature = "(I[Z)Z";
rc = (HTC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HTC_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -3798,7 +3800,8 @@ HVC_RetCode HiveClient_JNI::init()
JavaMethods_[JM_EXEC_HIVE_SQL].jm_name = "executeHiveSQL";
JavaMethods_[JM_EXEC_HIVE_SQL].jm_signature = "(Ljava/lang/String;)V";
rc = (HVC_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HVC_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HBaseClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HBaseClient_JNI.h b/core/sql/executor/HBaseClient_JNI.h
index f6667d9..3177a7a 100644
--- a/core/sql/executor/HBaseClient_JNI.h
+++ b/core/sql/executor/HBaseClient_JNI.h
@@ -246,7 +246,7 @@ public:
std::string* getHTableName();
// Get the error description.
- virtual char* getErrorText(HTC_RetCode errEnum);
+ static char* getErrorText(HTC_RetCode errEnum);
void setTableName(const char *tableName)
{
@@ -429,7 +429,7 @@ typedef enum {
class HBaseClient_JNI : public JavaObjectInterface
{
public:
- static HBaseClient_JNI* getInstance(int debugPort, int debugTimeout);
+ static HBaseClient_JNI* getInstance();
static void deleteInstance();
// Destructor
@@ -488,7 +488,7 @@ public:
HBaseClientRequest* getHBaseRequest();
bool workerThreadsStarted() { return (threadID_[0] ? true : false); }
// Get the error description.
- virtual char* getErrorText(HBC_RetCode errEnum);
+ static char* getErrorText(HBC_RetCode errEnum);
static void logIt(const char* str);
@@ -542,7 +542,7 @@ public:
private:
// private default constructor
- HBaseClient_JNI(NAHeap *heap, int debugPort, int debugTimeout);
+ HBaseClient_JNI(NAHeap *heap);
NAArray<HbaseStr>* getKeys(Int32 funcIndex, NAHeap *heap, const char *tableName, bool useTRex);
private:
@@ -665,7 +665,7 @@ public:
HVC_RetCode executeHiveSQL(const char* hiveSQL);
// Get the error description.
- virtual char* getErrorText(HVC_RetCode errEnum);
+ static char* getErrorText(HVC_RetCode errEnum);
static void logIt(const char* str);
@@ -757,7 +757,7 @@ public:
HBLC_RetCode bulkLoadCleanup(const HbaseStr &tblName, const Text& location);
// Get the error description.
- virtual char* getErrorText(HBLC_RetCode errEnum);
+ static char* getErrorText(HBLC_RetCode errEnum);
private:
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index a3aef5a..63c4ac1 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -37,6 +37,11 @@ pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
static const char* const hdfsScanErrorEnumStr[] =
{
+ "Error in HdfsScan::setScanRanges"
+ ,"Java Exception in HdfsScan::setScanRanges"
+ ,"Error in HdfsScan::trafHdfsRead"
+ ,"Java Exceptiokn in HdfsScan::trafHdfsRead"
+ , "Hdfs scan End of Ranges"
};
@@ -62,13 +67,14 @@ HDFS_Scan_RetCode HdfsScan::init()
JavaMethods_[JM_CTOR ].jm_name = "<init>";
JavaMethods_[JM_CTOR ].jm_signature = "()V";
- JavaMethods_[JM_INIT_SCAN_RANGES].jm_name = "<init>";
- JavaMethods_[JM_INIT_SCAN_RANGES].jm_signature = "(Ljava/lang/Object;Ljava/lang/Object;[Ljava/lang/String;[J[J)V";
+ JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges";
+ JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J)V";
JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead";
JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HDFS_SCAN_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -79,17 +85,131 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
if (errEnum < (HDFS_Scan_RetCode)JOI_LAST)
return JavaObjectInterface::getErrorText((JOI_RetCode)errEnum);
else
- return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST-1];
+ return (char*)hdfsScanErrorEnumStr[errEnum-HDFS_SCAN_FIRST];
}
-//////////////////////////////////////////////////////////////////////////////
-HDFS_Scan_RetCode HdfsScan::initScanRanges()
+
+/////////////////////////////////////////////////////////////////////////////
+HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize)
{
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
+
+ if (initJNIEnv() != JOI_OK)
+ return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+
+ jobject j_buf1 = jenv_->NewDirectByteBuffer(hdfsScanBuf[0].buf_, scanBufSize);
+ if (j_buf1 == NULL) {
+ GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM));
+ jenv_->PopLocalFrame(NULL);
+ return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+ }
+
+ jobject j_buf2 = jenv_->NewDirectByteBuffer(hdfsScanBuf[1].buf_, scanBufSize);
+ if (j_buf2 == NULL) {
+ GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM));
+ jenv_->PopLocalFrame(NULL);
+ return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+ }
+ jobjectArray j_filenames = NULL;
+ jlongArray j_offsets = NULL;
+ jlongArray j_lens = NULL;
+ HdfsFileInfo *hdfo;
+ jstring j_obj;
+
+ HDFS_Scan_RetCode hdfsScanRetCode = HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+ int arrayLen = hdfsFileInfoArray->entries();
+ for (int i = 0; i < arrayLen; i++) {
+ hdfo = hdfsFileInfoArray->at(i);
+ j_obj = jenv_->NewStringUTF(hdfo->fileName());
+ if (jenv_->ExceptionCheck()) {
+ jenv_->PopLocalFrame(NULL);
+ return hdfsScanRetCode;
+ }
+ if (j_filenames == NULL) {
+ j_filenames = jenv_->NewObjectArray(arrayLen, jenv_->GetObjectClass(j_obj), NULL);
+ if (jenv_->ExceptionCheck()) {
+ jenv_->PopLocalFrame(NULL);
+ return hdfsScanRetCode;
+ }
+ }
+ jenv_->SetObjectArrayElement(j_filenames, i, (jobject)j_obj);
+ jenv_->DeleteLocalRef(j_obj);
+ if (j_offsets == NULL) {
+ j_offsets = jenv_->NewLongArray(arrayLen);
+ if (jenv_->ExceptionCheck()) {
+ jenv_->PopLocalFrame(NULL);
+ return hdfsScanRetCode;
+ }
+ }
+ long offset = hdfo->getStartOffset();
+ jenv_->SetLongArrayRegion(j_offsets, i, 1, &offset);
+ if (j_lens == NULL) {
+ j_lens = jenv_->NewLongArray(arrayLen);
+ if (jenv_->ExceptionCheck()) {
+ jenv_->PopLocalFrame(NULL);
+ return hdfsScanRetCode;
+ }
+ }
+ long len = hdfo->getBytesToRead()+rangeTailIOSize;
+ jenv_->SetLongArrayRegion(j_lens, i, 1, &len);
+ }
+
+ jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_filenames, j_offsets, j_lens);
+
+ if (jenv_->ExceptionCheck()) {
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsScan::setScanRanges()", getLastError());
+ jenv_->PopLocalFrame(NULL);
+ return HDFS_SCAN_ERROR_SET_SCAN_RANGES_EXCEPTION;
+ }
return HDFS_SCAN_OK;
}
-HDFS_Scan_RetCode HdfsScan::trafHdfsRead()
+HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode &hdfsScanRetCode)
{
- return HDFS_SCAN_OK;
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
+
+ if (initJNIEnv() != JOI_OK)
+ return NULL;
+ hdfsScanRetCode = HDFS_SCAN_OK;
+ HdfsScan *hdfsScan = new (heap) HdfsScan(heap);
+ if (hdfsScan != NULL) {
+ hdfsScanRetCode = hdfsScan->init();
+ if (hdfsScanRetCode == HDFS_SCAN_OK)
+ hdfsScanRetCode = hdfsScan->setScanRanges(heap, hdfsScanBuf, scanBufSize,
+ hdfsFileInfoArray, rangeTailIOSize);
+ if (hdfsScanRetCode != HDFS_SCAN_OK) {
+ NADELETE(hdfsScan, HdfsScan, heap);
+ hdfsScan = NULL;
+ }
+ }
+ return hdfsScan;
+}
+
+
+HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen)
+{
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::trafHdfsRead() called.");
+
+ if (initJNIEnv() != JOI_OK)
+ return HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM;
+
+ jintArray j_retArray = (jintArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_TRAF_HDFS_READ].methodID);
+ if (jenv_->ExceptionCheck()) {
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsScan::setScanRanges()", getLastError());
+ jenv_->PopLocalFrame(NULL);
+ return HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION;
+ }
+ if (j_retArray == NULL)
+ return HDFS_SCAN_EOR;
+ short retArrayLen = jenv_->GetArrayLength(j_retArray);
+ ex_assert(retArrayLen == arrayLen, "HdfsScan::trafHdfsRead() InternalError: retArrayLen != arrayLen");
+ jenv_->GetIntArrayRegion(j_retArray, 0, 4, retArray);
+ return HDFS_SCAN_OK;
}
// ===========================================================================
@@ -123,12 +243,15 @@ static const char* const hdfsClientErrorEnumStr[] =
//////////////////////////////////////////////////////////////////////////////
HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode)
{
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
+
+ if (initJNIEnv() != JOI_OK)
+ return NULL;
retCode = HDFS_CLIENT_OK;
HdfsClient *hdfsClient = new (heap) HdfsClient(heap);
if (hdfsClient != NULL) {
retCode = hdfsClient->init();
- if (retCode != HDFS_CLIENT_OK)
- {
+ if (retCode != HDFS_CLIENT_OK) {
NADELETE(hdfsClient, HdfsClient, heap);
hdfsClient = NULL;
}
@@ -170,7 +293,8 @@ HDFS_Client_RetCode HdfsClient::init()
JavaMethods_[JM_HDFS_DELETE_PATH].jm_name = "hdfsDeletePath";
JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z";
rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == HDFS_CLIENT_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -189,7 +313,7 @@ char* HdfsClient::getErrorText(HDFS_Client_RetCode errEnum)
HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress)
{
- QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path);
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path);
if (initJNIEnv() != JOI_OK)
return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM;
@@ -209,15 +333,15 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress)
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsCreate()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreate()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_CREATE_EXCEPTION;
}
if (jresult == false)
{
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsCreate()", getLastError());
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreate()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM;
}
@@ -231,7 +355,7 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress)
//////////////////////////////////////////////////////////////////////////////
HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len)
{
- QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len);
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len);
if (initJNIEnv() != JOI_OK)
return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
@@ -252,15 +376,15 @@ HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len)
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsWrite()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
}
if (jresult == false)
{
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsWrite()", getLastError());
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
}
@@ -275,7 +399,7 @@ HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len)
//////////////////////////////////////////////////////////////////////////////
HDFS_Client_RetCode HdfsClient::hdfsClose()
{
- QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "HdfsClient::close() called.");
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::close() called.");
if (initJNIEnv() != JOI_OK)
return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION;
@@ -287,15 +411,15 @@ HDFS_Client_RetCode HdfsClient::hdfsClose()
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsClose()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsClose()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION;
}
if (jresult == false)
{
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsClose()", getLastError());
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsClose()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION;
}
@@ -306,7 +430,7 @@ HDFS_Client_RetCode HdfsClient::hdfsClose()
HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath)
{
- QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.",
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCleanUnloadPath(%s) called.",
uldPath.data());
if (initJNIEnv() != JOI_OK)
return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM;
@@ -324,8 +448,8 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath)
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsCleanUnloadPath()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsCleanUnloadPath()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_CLEANUP_EXCEPTION;
}
@@ -337,7 +461,7 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath)
HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath,
const NAString& dstPath)
{
- QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsMergeFiles(%s, %s) called.",
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsMergeFiles(%s, %s) called.",
srcPath.data(), dstPath.data());
if (initJNIEnv() != JOI_OK)
@@ -364,15 +488,15 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath,
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsMergeFiles()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsMergeFiles()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION;
}
if (jresult == false)
{
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsMergeFiles()", getLastError());
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsMergeFiles()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION;
}
@@ -383,7 +507,7 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath,
HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath)
{
- QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.",
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.",
delPath.data());
if (initJNIEnv() != JOI_OK)
return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
@@ -402,15 +526,15 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath)
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsDeletePath()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsDeletePath()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
}
if (jresult == false)
{
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsDeletePath()", getLastError());
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsDeletePath()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
}
@@ -421,7 +545,7 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath)
HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean & exist)
{
- QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.",
+ QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.",
uldPath.data());
if (initJNIEnv() != JOI_OK)
@@ -441,8 +565,8 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean &
if (jenv_->ExceptionCheck())
{
getExceptionDetails();
- logError(CAT_SQL_HBASE, __FILE__, __LINE__);
- logError(CAT_SQL_HBASE, "HdfsClient::hdfsExists()", getLastError());
+ logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS, "HdfsClient::hdfsExists()", getLastError());
jenv_->PopLocalFrame(NULL);
return HDFS_CLIENT_ERROR_HDFS_EXISTS_EXCEPTION;
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index 8adf42f..0426ebc 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -24,6 +24,7 @@
#define HDFS_CLIENT_H
#include "JavaObjectInterface.h"
+#include "ExHdfsScan.h"
// ===========================================================================
// ===== The native HdfsScan class implements access to the Java methods
@@ -33,6 +34,11 @@
typedef enum {
HDFS_SCAN_OK = JOI_OK
,HDFS_SCAN_FIRST = JOI_LAST
+ ,HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM = HDFS_SCAN_FIRST
+ ,HDFS_SCAN_ERROR_SET_SCAN_RANGES_EXCEPTION
+ ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM
+ ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION
+ ,HDFS_SCAN_EOR
,HDFS_SCAN_LAST
} HDFS_Scan_RetCode;
@@ -44,28 +50,28 @@ public:
: JavaObjectInterface(heap)
{}
- // Destructor
- virtual ~HdfsScan();
-
- // Get the error description.
- virtual char* getErrorText(HDFS_Scan_RetCode errEnum);
-
// Initialize JVM and all the JNI configuration.
// Must be called.
HDFS_Scan_RetCode init();
- HDFS_Scan_RetCode initScanRanges();
+ // Get the error description.
+ static char* getErrorText(HDFS_Scan_RetCode errEnum);
+
+ static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize, HDFS_Scan_RetCode &hdfsScanRetCode);
+
+ HDFS_Scan_RetCode setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ HdfsFileInfoArray *hdfsFileInfoArray, int rangeTailIOSize);
- HDFS_Scan_RetCode trafHdfsRead();
+ HDFS_Scan_RetCode trafHdfsRead(NAHeap *heap, int retArray[], short arrayLen);
private:
enum JAVA_METHODS {
JM_CTOR = 0,
- JM_INIT_SCAN_RANGES,
+ JM_SET_SCAN_RANGES,
JM_TRAF_HDFS_READ,
JM_LAST
};
-
static jclass javaClass_;
static JavaMethodInit* JavaMethods_;
static bool javaMethodsInitialized_;
@@ -109,7 +115,7 @@ public:
static HdfsClient *newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode);
// Get the error description.
- virtual char* getErrorText(HDFS_Client_RetCode errEnum);
+ static char* getErrorText(HDFS_Client_RetCode errEnum);
// Initialize JVM and all the JNI configuration.
// Must be called.
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/JavaObjectInterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/JavaObjectInterface.cpp b/core/sql/executor/JavaObjectInterface.cpp
index 2f5b2c7..ecd8c1e 100644
--- a/core/sql/executor/JavaObjectInterface.cpp
+++ b/core/sql/executor/JavaObjectInterface.cpp
@@ -40,6 +40,9 @@
JavaVM* JavaObjectInterface::jvm_ = NULL;
jint JavaObjectInterface::jniHandleCapacity_ = 0;
+int JavaObjectInterface::debugPort_ = 0;
+int JavaObjectInterface::debugTimeout_ = 0;
+
__thread JNIEnv* jenv_ = NULL;
__thread NAString *tsRecentJMFromJNI = NULL;
jclass JavaObjectInterface::gThrowableClass = NULL;
@@ -285,7 +288,7 @@ int JavaObjectInterface::createJVM(LmJavaOptions *options)
if (mySqRoot != NULL)
{
len = strlen(mySqRoot);
- oomDumpDir = new (heap_) char[len+50];
+ oomDumpDir = new char[len+50];
strcpy(oomDumpDir, "-XX:HeapDumpPath=");
strcat(oomDumpDir, mySqRoot);
strcat(oomDumpDir, "/logs");
@@ -317,7 +320,7 @@ int JavaObjectInterface::createJVM(LmJavaOptions *options)
if (classPathArg)
free(classPathArg);
if (oomDumpDir)
- NADELETEBASIC(oomDumpDir, heap_);
+ delete oomDumpDir;
return ret;
}
@@ -346,8 +349,6 @@ JOI_RetCode JavaObjectInterface::initJVM(LmJavaOptions *options)
GetCliGlobals()->setJniErrorStr(getErrorText(JOI_ERROR_CHECK_JVM));
return JOI_ERROR_CREATE_JVM;
}
-
- needToDetach_ = false;
QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_DEBUG, "Created a new JVM.");
}
char *jniHandleCapacityStr = getenv("TRAF_JNIHANDLE_CAPACITY");
@@ -371,7 +372,6 @@ JOI_RetCode JavaObjectInterface::initJVM(LmJavaOptions *options)
if (result != JNI_OK)
return JOI_ERROR_ATTACH_JVM;
- needToDetach_ = true;
QRLogger::log(CAT_SQL_HDFS_JNI_TOP, LL_DEBUG, "Attached to an existing JVM from another thread.");
break;
@@ -537,11 +537,10 @@ void JavaObjectInterface::logError(std::string &cat, const char* file, int line)
NABoolean JavaObjectInterface::getExceptionDetails(JNIEnv *jenv)
{
- NAString error_msg(heap_);
-
if (jenv == NULL)
jenv = jenv_;
CliGlobals *cliGlobals = GetCliGlobals();
+ NAString error_msg(heap_);
if (jenv == NULL)
{
error_msg = "Internal Error - Unable to obtain jenv";
@@ -646,7 +645,6 @@ JOI_RetCode JavaObjectInterface::initJNIEnv()
return retcode;
}
if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
return JOI_ERROR_INIT_JNI;
}
return JOI_OK;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/JavaObjectInterface.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/JavaObjectInterface.h b/core/sql/executor/JavaObjectInterface.h
index 772148b..b167420 100644
--- a/core/sql/executor/JavaObjectInterface.h
+++ b/core/sql/executor/JavaObjectInterface.h
@@ -79,13 +79,10 @@ public:
protected:
// Default constructor - for creating a new JVM
- JavaObjectInterface(NAHeap *heap , int debugPort = 0, int debugTimeout = 0)
+ JavaObjectInterface(NAHeap *heap)
: heap_(heap)
,javaObj_(NULL)
- ,needToDetach_(false)
,isInitialized_(false)
- ,debugPort_(debugPort)
- ,debugTimeout_(debugTimeout)
{
tid_ = syscall(SYS_gettid);
}
@@ -94,10 +91,7 @@ protected:
JavaObjectInterface(NAHeap *heap, jobject jObj)
: heap_(heap)
,javaObj_(NULL)
- ,needToDetach_(false)
,isInitialized_(false)
- ,debugPort_(0)
- ,debugTimeout_(0)
{
tid_ = syscall(SYS_gettid);
// When jObj is not null in the constructor
@@ -113,17 +107,17 @@ protected:
virtual ~JavaObjectInterface();
// Create a new JVM
- int createJVM(LmJavaOptions *options);
+ static int createJVM(LmJavaOptions *options);
// Initialize the JVM.
- JOI_RetCode initJVM(LmJavaOptions *options = NULL);
+ static JOI_RetCode initJVM(LmJavaOptions *options = NULL);
// Initialize JVM and all the JNI configuration.
// Must be called.
JOI_RetCode init(char *className, jclass &javaclass, JavaMethodInit* JavaMethods, Int32 howManyMethods, bool methodsInitialized);
// Get the error description.
- virtual char* getErrorText(JOI_RetCode errEnum);
+ static char* getErrorText(JOI_RetCode errEnum);
NAString getLastError();
@@ -132,8 +126,8 @@ protected:
void logError(std::string &cat, const char* methodName, jstring jresult);
void logError(std::string &cat, const char* file, int line);
- JOI_RetCode initJNIEnv();
- char* buildClassPath();
+ static JOI_RetCode initJNIEnv();
+ static char* buildClassPath();
public:
void setJavaObject(jobject jobj);
@@ -152,6 +146,7 @@ public:
// Pass in jenv if the thread where the object is created is different than
// the thread where exception occurred
NABoolean getExceptionDetails(JNIEnv *jenv = NULL);
+
void appendExceptionMessages(JNIEnv *jenv, jthrowable a_exception, NAString &error_msg);
NAHeap *getHeap() { return heap_; }
@@ -166,10 +161,9 @@ protected:
static jint jniHandleCapacity_;
jobject javaObj_;
- bool needToDetach_;
bool isInitialized_;
- int debugPort_;
- int debugTimeout_;
+ static int debugPort_;
+ static int debugTimeout_;
pid_t tid_;
NAHeap *heap_;
};
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp
index ddaa27a..988704d 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -138,7 +138,8 @@ OFR_RetCode OrcFileReader::init()
javaClass_,
JavaMethods_,
(Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (lv_retcode == OFR_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return lv_retcode;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp
index 5bc2f7f..8feb1d5 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -115,7 +115,8 @@ SFR_RetCode SequenceFileReader::init()
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
rc = (SFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == SFR_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
@@ -440,7 +441,8 @@ SFW_RetCode SequenceFileWriter::init()
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
rc = (SFW_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
- javaMethodsInitialized_ = TRUE;
+ if (rc == SFW_OK)
+ javaMethodsInitialized_ = TRUE;
pthread_mutex_unlock(&javaMethodsInitMutex_);
}
return rc;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpErrorEnums.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpErrorEnums.h b/core/sql/exp/ExpErrorEnums.h
index ba604bf..8227cb8 100644
--- a/core/sql/exp/ExpErrorEnums.h
+++ b/core/sql/exp/ExpErrorEnums.h
@@ -163,6 +163,7 @@ enum ExeErrorCode
EXE_OLAP_OVERFLOW_NOT_SUPPORTED = 8441,
EXE_ERROR_FROM_LOB_INTERFACE = 8442,
EXE_INVALID_LOB_HANDLE = 8443,
+ EXE_ERROR_HDFS_SCAN = 8447,
EXE_LAST_EXPRESSIONS_ERROR = 8499,
// ---------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpHbaseInterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpHbaseInterface.cpp b/core/sql/exp/ExpHbaseInterface.cpp
index b7746e0..8a1d426 100644
--- a/core/sql/exp/ExpHbaseInterface.cpp
+++ b/core/sql/exp/ExpHbaseInterface.cpp
@@ -48,9 +48,7 @@
ExpHbaseInterface::ExpHbaseInterface(CollHeap * heap,
const char * server,
- const char * zkPort,
- int debugPort,
- int debugTimeout)
+ const char * zkPort)
{
heap_ = heap;
hbs_ = NULL;
@@ -66,19 +64,13 @@ ExpHbaseInterface::ExpHbaseInterface(CollHeap * heap,
strcpy(zkPort_, zkPort);
else
zkPort_[0] = 0;
-
- debugPort_ = debugPort;
- debugTimeout_ = debugTimeout;
}
ExpHbaseInterface* ExpHbaseInterface::newInstance(CollHeap* heap,
const char* server,
- const char *zkPort,
- int debugPort,
- int debugTimeout)
+ const char *zkPort)
{
- return new (heap) ExpHbaseInterface_JNI(heap, server, TRUE, zkPort,
- debugPort, debugTimeout); // This is the transactional interface
+ return new (heap) ExpHbaseInterface_JNI(heap, server, TRUE,zkPort);
}
NABoolean isParentQueryCanceled()
@@ -283,8 +275,8 @@ char * getHbaseErrStr(Lng32 errEnum)
// ===========================================================================
ExpHbaseInterface_JNI::ExpHbaseInterface_JNI(CollHeap* heap, const char* server, bool useTRex,
- const char *zkPort, int debugPort, int debugTimeout)
- : ExpHbaseInterface(heap, server, zkPort, debugPort, debugTimeout)
+ const char *zkPort)
+ : ExpHbaseInterface(heap, server, zkPort)
,useTRex_(useTRex)
,client_(NULL)
,htc_(NULL)
@@ -324,7 +316,7 @@ Lng32 ExpHbaseInterface_JNI::init(ExHbaseAccessStats *hbs)
if (client_ == NULL)
{
HBaseClient_JNI::logIt("ExpHbaseInterface_JNI::init() creating new client.");
- client_ = HBaseClient_JNI::getInstance(debugPort_, debugTimeout_);
+ client_ = HBaseClient_JNI::getInstance();
if (client_->isInitialized() == FALSE)
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpHbaseInterface.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpHbaseInterface.h b/core/sql/exp/ExpHbaseInterface.h
index f7b23cb..f68de05 100644
--- a/core/sql/exp/ExpHbaseInterface.h
+++ b/core/sql/exp/ExpHbaseInterface.h
@@ -71,9 +71,7 @@ class ExpHbaseInterface : public NABasicObject
static ExpHbaseInterface* newInstance(CollHeap* heap,
const char* server = NULL,
- const char *zkPort = NULL,
- int debugPort = 0,
- int DebugTimeout = 0);
+ const char *zkPort = NULL);
virtual ~ExpHbaseInterface()
{}
@@ -389,16 +387,12 @@ protected:
ExpHbaseInterface(CollHeap * heap,
const char * server = NULL,
- const char * zkPort = NULL,
- int debugPort = 0,
- int debugTimeout = 0);
+ const char * zkPort = NULL);
CollHeap * heap_;
ExHbaseAccessStats * hbs_;
char server_[MAX_SERVER_SIZE+1];
char zkPort_[MAX_PORT_SIZE+1];
- int debugPort_;
- int debugTimeout_;
};
char * getHbaseErrStr(Lng32 errEnum);
@@ -410,7 +404,7 @@ class ExpHbaseInterface_JNI : public ExpHbaseInterface
ExpHbaseInterface_JNI(CollHeap* heap,
const char* server, bool useTRex,
- const char *zkPort, int debugPort, int debugTimeout);
+ const char *zkPort);
virtual ~ExpHbaseInterface_JNI();
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/exp/ExpLOBinterface.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h
index df6c142..b98d2b4 100644
--- a/core/sql/exp/ExpLOBinterface.h
+++ b/core/sql/exp/ExpLOBinterface.h
@@ -344,19 +344,6 @@ Lng32 ExpLOBInterfaceGetLobLength(ExLobGlobals * exLobGlob,
);
-/*
-class HdfsFileInfo
-{
- public:
- char * fileName() { return fileName_; }
- Int64 getStartOffset() { return startOffset_; }
- Int64 getBytesToRead() { return bytesToRead_; }
- Lng32 entryNum_; // 0 based, first entry is entry num 0.
- NABasicPtr fileName_;
- Int64 startOffset_;
- Int64 bytesToRead_;
-};
-*/
#endif
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/generator/GenRelScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp
index 013651b..3d01223 100644
--- a/core/sql/generator/GenRelScan.cpp
+++ b/core/sql/generator/GenRelScan.cpp
@@ -1391,6 +1391,9 @@ if (hTabStats->isOrcFile())
hdfsscan_tdb->setUseCif(useCIF);
hdfsscan_tdb->setUseCifDefrag(useCIFDegrag);
+ if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
+ hdfsscan_tdb->setUseLibhdfsScan(TRUE);
+
if(!generator->explainDisabled()) {
generator->setExplainTuple(
addExplainInfo(hdfsscan_tdb, 0, 0, generator));
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/qmscommon/QRLogger.cpp
----------------------------------------------------------------------
diff --git a/core/sql/qmscommon/QRLogger.cpp b/core/sql/qmscommon/QRLogger.cpp
index f4fb75d..0b8c398 100644
--- a/core/sql/qmscommon/QRLogger.cpp
+++ b/core/sql/qmscommon/QRLogger.cpp
@@ -60,6 +60,7 @@ std::string CAT_SQL_HDFS_SEQ_FILE_READER = "SQL.HDFS.SeqFileReader";
std::string CAT_SQL_HDFS_SEQ_FILE_WRITER = "SQL.HDFS.SeqFileWriter";
std::string CAT_SQL_HDFS_ORC_FILE_READER = "SQL.HDFS.OrcFileReader";
std::string CAT_SQL_HBASE = "SQL.HBase";
+std::string CAT_SQL_HDFS = "SQL.HDFS";
// these categories are currently not used
std::string CAT_SQL_QMP = "SQL.Qmp";
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/qmscommon/QRLogger.h
----------------------------------------------------------------------
diff --git a/core/sql/qmscommon/QRLogger.h b/core/sql/qmscommon/QRLogger.h
index 5cabac4..3be016e 100644
--- a/core/sql/qmscommon/QRLogger.h
+++ b/core/sql/qmscommon/QRLogger.h
@@ -67,6 +67,7 @@ extern std::string CAT_SQL_HDFS_SEQ_FILE_READER;
extern std::string CAT_SQL_HDFS_SEQ_FILE_WRITER;
extern std::string CAT_SQL_HDFS_ORC_FILE_READER;
extern std::string CAT_SQL_HBASE;
+extern std::string CAT_SQL_HDFS;
class ComDiagsArea;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h
index b7e4d0d..339d55f 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3306,6 +3306,9 @@ enum DefaultConstants
SUPPRESS_CHAR_LIMIT_CHECK,
BMO_MEMORY_ESTIMATE_OUTLIER_FACTOR,
+
+ // Use the earlier implementation of HdfsScan via libhdfs
+ USE_LIBHDFS_SCAN,
// This enum constant must be the LAST one in the list; it's a count,
// not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
__NUM_DEFAULT_ATTRIBUTES
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index a75c20b..28148cd 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -3033,6 +3033,8 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"),
// Use large queues on RHS of Flow/Nested Join when appropriate
DDkwd__(USE_LARGE_QUEUES, "ON"),
+ DDkwd__(USE_LIBHDFS_SCAN, "ON"),
+
DDkwd__(USE_MAINTAIN_CONTROL_TABLE, "OFF"),
DDkwd__(USE_OLD_DT_CONSTRUCTOR, "OFF"),
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 8d2052f..1af2c49 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -54,6 +54,7 @@ public class HDFSClient
private static FileSystem defaultFs_ = null;
private FileSystem fs_ = null;
private int bufNo_;
+ private int rangeNo_;
private FSDataInputStream fsdis_;
private OutputStream outStream_;
private String filename_;
@@ -66,7 +67,7 @@ public class HDFSClient
private int blockSize_;
private int bytesRead_;
private Future future_ = null;
-
+ private int isEOF_ = 0;
static {
String confFile = System.getProperty("trafodion.log4j.configFile");
System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -111,9 +112,10 @@ public class HDFSClient
{
}
- public HDFSClient(int bufNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
+ public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
{
bufNo_ = bufNo;
+ rangeNo_ = rangeNo;
filename_ = filename;
Path filepath = new Path(filename_);
fs_ = FileSystem.get(filepath.toUri(),config_);
@@ -164,13 +166,27 @@ public class HDFSClient
return bytesRead;
}
+ public int getRangeNo()
+ {
+ return rangeNo_;
+ }
+
+ public int isEOF()
+ {
+ return isEOF_;
+ }
+
public int trafHdfsReadBuffer() throws IOException, InterruptedException, ExecutionException
{
int bytesRead;
int totalBytesRead = 0;
while (true) {
bytesRead = trafHdfsRead();
- if (bytesRead == -1 || bytesRead == 0)
+ if (bytesRead == -1) {
+ isEOF_ = 1;
+ return totalBytesRead;
+ }
+ if (bytesRead == 0)
return totalBytesRead;
totalBytesRead += bytesRead;
if (totalBytesRead == bufLen_)
http://git-wip-us.apache.org/repos/asf/trafodion/blob/f17e15ee/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index bf81ab0..9fb145e 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -88,7 +88,11 @@ public class HdfsScan
System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
}
- HdfsScan(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[]) throws IOException
+ public HdfsScan()
+ {
+ }
+
+ public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[]) throws IOException
{
buf_ = new ByteBuffer[2];
bufLen_ = new int[2];
@@ -119,13 +123,14 @@ public class HdfsScan
public void hdfsScanRange(int bufNo) throws IOException
{
- System.out.println (" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo);
+ if (logger_.isDebugEnabled())
+ logger_.debug(" CurrentRange " + currRange_ + " LenRemain " + lenRemain_ + " BufNo " + bufNo);
int readLength;
if (lenRemain_ > bufLen_[bufNo])
readLength = bufLen_[bufNo];
else
readLength = (int)lenRemain_;
- hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
+ hdfsClient_[bufNo] = new HDFSClient(bufNo, currRange_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currPos_, readLength);
lenRemain_ -= readLength;
currPos_ += readLength;
if (lenRemain_ == 0) {
@@ -144,29 +149,44 @@ public class HdfsScan
int[] retArray;
int byteCompleted;
int bufNo;
-
+ int rangeNo;
+ int isEOF;
+
+ if (hdfsScanRanges_ == null)
+ throw new IOException("Scan ranges are not yet set");
if (scanCompleted_)
return null;
- retArray = new int[2];
+ retArray = new int[4];
switch (lastBufCompleted_) {
case -1:
case 1:
byteCompleted = hdfsClient_[0].trafHdfsReadBuffer();
bufNo = 0;
+ rangeNo = hdfsClient_[0].getRangeNo();
+ isEOF = hdfsClient_[0].isEOF();
break;
case 0:
byteCompleted = hdfsClient_[1].trafHdfsReadBuffer();
bufNo = 1;
+ rangeNo = hdfsClient_[1].getRangeNo();
+ isEOF = hdfsClient_[1].isEOF();
break;
default:
bufNo = -1;
byteCompleted = -1;
+ rangeNo = -1;
+ isEOF = 0;
}
lastBufCompleted_ = bufNo;
retArray[0] = byteCompleted;
retArray[1] = bufNo;
- System.out.println (" Buffer No " + retArray[1] + " Bytes Read " + retArray[0]);
+ retArray[2] = rangeNo;
+ retArray[3] = isEOF;
+ if (logger_.isDebugEnabled())
+ logger_.debug(" Range No " + retArray[2] + " Buffer No " + retArray[1] + " Bytes Read " + retArray[0] + " isEOF " + retArray[3]);
lastBufCompleted_ = bufNo;
+ if ((isEOF == 1) && (currRange_ == (hdfsScanRanges_.length-1)))
+ lastScanRangeScheduled_ = true;
if (lastScanRangeScheduled_) {
scanCompleted_ = true;
return retArray;
@@ -233,7 +253,8 @@ public class HdfsScan
}
}
long time1 = System.currentTimeMillis();
- HdfsScan hdfsScan = new HdfsScan(buf1, buf2, fileName, pos, len);
+ HdfsScan hdfsScan = new HdfsScan();
+ hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len);
int[] retArray;
int bytesCompleted;
while (true) {