You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/05/10 21:08:21 UTC
[2/4] trafodion git commit: [TRAFODION-3065] Trafodion to support
compressed Hive Text formatted tables
[TRAFODION-3065] Trafodion to support compressed Hive Text formatted tables
Compressed text files are now supported via the new implementation using
HDFS java APIs. When the hadoop is not configured to support a particular type
of compression, an error is thrown.
[TRAFODION-2982] JNI HDFS interface should support varied sized large buffers for read/write
A new CQD HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB is introduced to chunk
the read and write when byteArray is involved.
Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/96cab4dd
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/96cab4dd
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/96cab4dd
Branch: refs/heads/master
Commit: 96cab4ddd086a59ebc0eab8ac4a93ee3cf315aac
Parents: f216cdb
Author: selvaganesang <se...@esgyn.com>
Authored: Wed May 9 00:36:04 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Wed May 9 18:51:31 2018 +0000
----------------------------------------------------------------------
core/sql/comexe/ComTdbFastTransport.cpp | 1 +
core/sql/comexe/ComTdbFastTransport.h | 7 +-
core/sql/comexe/ComTdbHdfsScan.cpp | 1 +
core/sql/comexe/ComTdbHdfsScan.h | 5 +
core/sql/executor/ExHdfsScan.cpp | 16 ++-
core/sql/executor/HdfsClient_JNI.cpp | 126 +++++++++++------
core/sql/executor/HdfsClient_JNI.h | 9 +-
.../sql/executor/org_trafodion_sql_HDFSClient.h | 31 ++++
core/sql/exp/ExpLOBinterface.h | 10 ++
core/sql/generator/GenFastTransport.cpp | 4 +
core/sql/generator/GenRelScan.cpp | 5 +-
core/sql/regress/hive/DIFF002.KNOWN | 14 ++
core/sql/regress/hive/FILTER002 | 33 +++++
core/sql/sqlcomp/DefaultConstants.h | 5 +
core/sql/sqlcomp/nadefaults.cpp | 1 +
.../main/java/org/trafodion/sql/HDFSClient.java | 140 +++++++++++++++----
.../main/java/org/trafodion/sql/HdfsScan.java | 34 ++++-
17 files changed, 354 insertions(+), 88 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.cpp b/core/sql/comexe/ComTdbFastTransport.cpp
index 49d830e..90f635f 100644
--- a/core/sql/comexe/ComTdbFastTransport.cpp
+++ b/core/sql/comexe/ComTdbFastTransport.cpp
@@ -99,6 +99,7 @@ ComTdbFastExtract::ComTdbFastExtract(
hdfsReplication_(replication),
ioTimeout_(ioTimeout),
childDataRowLen_(childDataRowLen),
+ hdfsIoByteArraySize_(0),
modTSforDir_(-1)
{
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbFastTransport.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbFastTransport.h b/core/sql/comexe/ComTdbFastTransport.h
index 37c01da..0666953 100644
--- a/core/sql/comexe/ComTdbFastTransport.h
+++ b/core/sql/comexe/ComTdbFastTransport.h
@@ -369,6 +369,10 @@ public:
void setModTSforDir(Int64 v) { modTSforDir_ = v; }
Int64 getModTSforDir() const { return modTSforDir_; }
+ void setHdfsIoByteArraySize(int size)
+ { hdfsIoByteArraySize_ = size; }
+ UInt16 getHdfsIoByteArraySize()
+ { return hdfsIoByteArraySize_; }
protected:
NABasicPtr targetName_; // 00 - 07
NABasicPtr delimiter_; // 08 - 15
@@ -395,9 +399,10 @@ protected:
UInt16 filler_; // 130 - 131
UInt32 childDataRowLen_; // 132 - 135
Int64 modTSforDir_; // 136 - 143
+ UInt16 hdfsIoByteArraySize_; // 144 - 147
// Make sure class size is a multiple of 8
- char fillerComTdbFastTransport_[8]; // 144 - 151
+ char fillerComTdbFastTransport_[4]; // 148 - 151
};
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.cpp b/core/sql/comexe/ComTdbHdfsScan.cpp
index a0bf5c1..f5e2907 100755
--- a/core/sql/comexe/ComTdbHdfsScan.cpp
+++ b/core/sql/comexe/ComTdbHdfsScan.cpp
@@ -121,6 +121,7 @@ ComTdbHdfsScan::ComTdbHdfsScan(
hdfsRootDir_(hdfsRootDir),
modTSforDir_(modTSforDir),
numOfPartCols_(numOfPartCols),
+ hdfsIoByteArraySize_(0),
hdfsDirsToCheck_(hdfsDirsToCheck)
{};
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index ea995fb..f9a0afd 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -136,6 +136,7 @@ class ComTdbHdfsScan : public ComTdb
UInt16 origTuppIndex_; // 188 - 189
char fillersComTdbHdfsScan1_[2]; // 190 - 191
NABasicPtr nullFormat_; // 192 - 199
+ UInt16 hdfsIoByteArraySize_; // 198 - 199
// next 4 params are used to check if data under hdfsFileDir
// was modified after query was compiled.
@@ -362,6 +363,10 @@ public:
Queue * hdfsDirsToCheck() { return hdfsDirsToCheck_; }
char *hdfsRootDir() { return hdfsRootDir_; }
+ void setHdfsIoByteArraySize(int size)
+ { hdfsIoByteArraySize_ = size; }
+ UInt16 getHdfsIoByteArraySize()
+ { return hdfsIoByteArraySize_; }
};
inline ComTdb * ComTdbHdfsScan::getChildTdb()
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 97697f3..e5d73dc 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -129,7 +129,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
Space * space = (glob ? glob->getSpace() : 0);
CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
- if (isSequenceFile() || hdfsScanTdb.isCompressedFile())
+ if (isSequenceFile())
useLibhdfsScan_ = TRUE;
lobGlob_ = NULL;
hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
@@ -569,6 +569,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
break;
}
hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_,
+ hdfsScanTdb().hdfsIoByteArraySize_,
&hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_,
hdfsStats_, hdfsScanRetCode);
if (hdfsScanRetCode != HDFS_SCAN_OK) {
@@ -602,6 +603,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
break;
}
hdfo = hdfsFileInfoListAsArray_.at(retArray_[RANGE_NO]);
+ if (retArray_[BYTES_COMPLETED] == 0) {
+ ex_assert(headRoomCopied_ == 0, "Internal Error in HdfsScan");
+ step_ = TRAF_HDFS_READ;
+ break;
+ }
bufEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED];
if (retArray_[RANGE_NO] != prevRangeNum_) {
currRangeBytesRead_ = retArray_[BYTES_COMPLETED];
@@ -624,13 +630,9 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead();
else
extraBytesRead_ = 0;
+ ex_assert(extraBytesRead_ >= 0, "Negative number of extraBytesRead");
// headRoom_ is the number of extra bytes to be read (rangeTailIOSize)
// If the whole range fits in one buffer, it is needed to process rows till EOF for the last range alone.
-/*
- if (retArray_[IS_EOF] && (extraBytesRead_ < headRoom_)
- && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1)))
- extraBytesRead_ = 0;
-*/
if (numFiles_ <= 1) {
if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && (retArray_[RANGE_NO] == (hdfsFileInfoListAsArray_.entries()-1)))
extraBytesRead_ = 0;
@@ -2048,7 +2050,7 @@ void ExHdfsScanTcb::computeRangesAtRuntime()
}
else
e->bytesToRead_ = (Int64) fileInfos[h].mSize;
-
+ e->compressionMethod_ = 0;
hdfsFileInfoListAsArray_.insertAt(h, e);
}
}
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index 0622b50..5b8e850 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -26,6 +26,7 @@
#include "Context.h"
#include "jni.h"
#include "HdfsClient_JNI.h"
+#include "org_trafodion_sql_HDFSClient.h"
// ===========================================================================
// ===== Class HdfsScan
@@ -83,7 +84,7 @@ HDFS_Scan_RetCode HdfsScan::init()
JavaMethods_[JM_CTOR ].jm_name = "<init>";
JavaMethods_[JM_CTOR ].jm_signature = "()V";
JavaMethods_[JM_SET_SCAN_RANGES].jm_name = "setScanRanges";
- JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J[I)V";
+ JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;S[Ljava/lang/String;[J[J[I[S)V";
JavaMethods_[JM_TRAF_HDFS_READ].jm_name = "trafHdfsRead";
JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
JavaMethods_[JM_STOP].jm_name = "stop";
@@ -106,7 +107,7 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
}
/////////////////////////////////////////////////////////////////////////////
-HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize,
HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize)
{
QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
@@ -138,10 +139,12 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
jenv_->PopLocalFrame(NULL);
return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
}
+ jshort j_hdfsIoByteArraySize = hdfsIoByteArraySize;
jobjectArray j_filenames = NULL;
jlongArray j_offsets = NULL;
jlongArray j_lens = NULL;
jintArray j_rangenums = NULL;
+ jshortArray j_compress = NULL;
HdfsFileInfo *hdfo;
jstring j_obj;
@@ -184,7 +187,11 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
return hdfsScanRetCode;
}
}
- long len = hdfo->getBytesToRead()+rangeTailIOSize;
+ long len;
+ if (hdfo->getBytesToRead() > (LONG_MAX-rangeTailIOSize))
+ len = LONG_MAX;
+ else
+ len = hdfo->getBytesToRead()+rangeTailIOSize;
jenv_->SetLongArrayRegion(j_lens, rangeCount, 1, &len);
if (j_rangenums == NULL) {
@@ -196,12 +203,24 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
}
jint tdbRangeNum = i;
jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum);
+
+ if (j_compress == NULL) {
+ j_compress = jenv_->NewShortArray(numRanges);
+ if (jenv_->ExceptionCheck()) {
+ jenv_->PopLocalFrame(NULL);
+ return hdfsScanRetCode;
+ }
+ }
+ short compressionMethod = (short)hdfo->getCompressionMethod();
+ //ex_assert(compressionMethod >= 0 && compressionMethod <= ComCompressionInfo::LZOP, "Illegal CompressionMethod Value");
+ jenv_->SetShortArrayRegion(j_compress, rangeCount, 1, &compressionMethod);
}
if (hdfsStats_ != NULL)
hdfsStats_->getHdfsTimer().start();
tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
- jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_filenames, j_offsets, j_lens, j_rangenums);
+ jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_hdfsIoByteArraySize,
+ j_filenames, j_offsets, j_lens, j_rangenums, j_compress);
if (hdfsStats_ != NULL) {
hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
hdfsStats_->incHdfsCalls();
@@ -216,7 +235,7 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScan
}
HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
- HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
+ short hdfsIoByteArraySize, HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode)
{
QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::newInstance() called.");
@@ -228,7 +247,7 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs
if (hdfsScan != NULL) {
hdfsScanRetCode = hdfsScan->init();
if (hdfsScanRetCode == HDFS_SCAN_OK)
- hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize,
+ hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, hdfsIoByteArraySize,
hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize);
if (hdfsScanRetCode == HDFS_SCAN_OK)
hdfsScan->setHdfsStats(hdfsStats);
@@ -359,7 +378,7 @@ void HdfsClient::deleteHdfsFileInfo()
hdfsFileInfo_ = NULL;
}
-HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode)
+HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize)
{
QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
@@ -369,8 +388,10 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HD
HdfsClient *hdfsClient = new (heap) HdfsClient(heap);
if (hdfsClient != NULL) {
retCode = hdfsClient->init();
- if (retCode == HDFS_CLIENT_OK)
+ if (retCode == HDFS_CLIENT_OK) {
hdfsClient->setHdfsStats(hdfsStats);
+ hdfsClient->setIoByteArraySize(hdfsIoByteArraySize);
+ }
else {
NADELETE(hdfsClient, HdfsClient, heap);
hdfsClient = NULL;
@@ -574,41 +595,50 @@ Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hd
hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
return 0;
}
-
- //Write the requisite bytes into the file
- jbyteArray jbArray = jenv_->NewByteArray( len);
- if (!jbArray) {
- GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM));
- jenv_->PopLocalFrame(NULL);
- hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
- return 0;
- }
- jenv_->SetByteArrayRegion(jbArray, 0, len, (const jbyte*)data);
-
- if (hdfsStats_ != NULL)
- hdfsStats_->getHdfsTimer().start();
-
- tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
- // Java method returns the cumulative bytes written
- jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray);
-
- if (hdfsStats_ != NULL) {
- hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
- hdfsStats_->incHdfsCalls();
- }
- if (jenv_->ExceptionCheck())
+ Int64 lenRemain = len;
+ Int64 writeLen;
+ Int64 chunkLen = (ioByteArraySize_ > 0 ? ioByteArraySize_ * 1024 : 0);
+ Int64 offset = 0;
+ do
{
- getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()");
- jenv_->PopLocalFrame(NULL);
- hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
- return 0;
- }
-
+ if ((chunkLen > 0) && (lenRemain > chunkLen))
+ writeLen = chunkLen;
+ else
+ writeLen = lenRemain;
+ //Write the requisite bytes into the file
+ jbyteArray jbArray = jenv_->NewByteArray(writeLen);
+ if (!jbArray) {
+ GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM));
+ jenv_->PopLocalFrame(NULL);
+ hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
+ return 0;
+ }
+ jenv_->SetByteArrayRegion(jbArray, 0, writeLen, (const jbyte*)(data+offset));
+
+ if (hdfsStats_ != NULL)
+ hdfsStats_->getHdfsTimer().start();
+
+ tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
+ // Java method returns the cumulative bytes written
+ jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray);
+
+ if (hdfsStats_ != NULL) {
+ hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+ hdfsStats_->incHdfsCalls();
+ }
+ if (jenv_->ExceptionCheck())
+ {
+ getExceptionDetails(__FILE__, __LINE__, "HdfsClient::hdfsWrite()");
+ jenv_->PopLocalFrame(NULL);
+ hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+ return 0;
+ }
+ lenRemain -= writeLen;
+ offset += writeLen;
+ } while (lenRemain > 0);
jenv_->PopLocalFrame(NULL);
hdfsClientRetcode = HDFS_CLIENT_OK;
- Int32 bytesWritten = totalBytesWritten - totalBytesWritten_;
- totalBytesWritten_ = totalBytesWritten;
- return bytesWritten;
+ return len;
}
Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode)
@@ -1018,6 +1048,22 @@ jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus
return (jint) retcode;
}
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer
+ (JNIEnv *jenv, jobject j_obj, jobject j_buf, jint offset, jbyteArray j_bufArray, jint copyLen)
+{
+ void *bufBacking;
+
+ bufBacking = jenv->GetDirectBufferAddress(j_buf);
+ if (bufBacking == NULL)
+ return -1;
+ jlong capacity = jenv->GetDirectBufferCapacity(j_buf);
+ jbyte *byteBufferAddr = (jbyte *)bufBacking + offset;
+ if ((offset + copyLen) > capacity)
+ return -2;
+ jenv->GetByteArrayRegion(j_bufArray, 0, copyLen, byteBufferAddr);
+ return 0;
+}
+
#ifdef __cplusplus
}
#endif
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index 6f68f4d..a85c590 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -66,11 +66,11 @@ public:
// Get the error description.
static char* getErrorText(HDFS_Scan_RetCode errEnum);
- static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ static HdfsScan *newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize,
HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
- HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize,
+ HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, short hdfsIoByteArraySize,
HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges,
int rangeTailIOSize);
@@ -169,9 +169,11 @@ public:
}
~HdfsClient();
- static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode);
+ static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode, short hdfsIoByteArraySize = 0);
static HdfsClient *getInstance();
static void deleteInstance();
+ void setIoByteArraySize(short size)
+ { ioByteArraySize_ = size; }
// Get the error description.
static char* getErrorText(HDFS_Client_RetCode errEnum);
@@ -224,6 +226,7 @@ private:
int numFiles_;
char *path_;
Int64 totalBytesWritten_;
+ short ioByteArraySize_;
ExHdfsScanStats *hdfsStats_;
static jclass javaClass_;
static JavaMethodInit* JavaMethods_;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/executor/org_trafodion_sql_HDFSClient.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/org_trafodion_sql_HDFSClient.h b/core/sql/executor/org_trafodion_sql_HDFSClient.h
new file mode 100644
index 0000000..6e3485e
--- /dev/null
+++ b/core/sql/executor/org_trafodion_sql_HDFSClient.h
@@ -0,0 +1,31 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class org_trafodion_sql_HDFSClient */
+
+#ifndef _Included_org_trafodion_sql_HDFSClient
+#define _Included_org_trafodion_sql_HDFSClient
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef org_trafodion_sql_HDFSClient_UNCOMPRESSED
+#define org_trafodion_sql_HDFSClient_UNCOMPRESSED 1L
+/*
+ * Class: org_trafodion_sql_HDFSClient
+ * Method: copyToByteBuffer
+ * Signature: (Ljava/nio/ByteBuffer;I[BI)I
+ */
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_copyToByteBuffer
+ (JNIEnv *, jobject, jobject, jint, jbyteArray, jint);
+
+/*
+ * Class: org_trafodion_sql_HDFSClient
+ * Method: sendFileStatus
+ * Signature: (JIIZLjava/lang/String;JJSJLjava/lang/String;Ljava/lang/String;SJ)I
+ */
+JNIEXPORT jint JNICALL Java_org_trafodion_sql_HDFSClient_sendFileStatus
+ (JNIEnv *, jobject, jlong, jint, jint, jboolean, jstring, jlong, jlong, jshort, jlong, jstring, jstring, jshort, jlong);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/exp/ExpLOBinterface.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBinterface.h b/core/sql/exp/ExpLOBinterface.h
index 8194859..54435a3 100644
--- a/core/sql/exp/ExpLOBinterface.h
+++ b/core/sql/exp/ExpLOBinterface.h
@@ -29,6 +29,13 @@
class HdfsFileInfo
{
public:
+ HdfsFileInfo() {
+ entryNum_ = -1;
+ startOffset_ = -1;
+ bytesToRead_ = 0;
+ compressionMethod_ = 0;
+ flags_ = 0;
+ }
char * fileName() { return fileName_; }
// used for text/seq file access
@@ -39,6 +46,8 @@ class HdfsFileInfo
Int64 getStartRow() { return startOffset_; }
Int64 getNumRows() { return bytesToRead_; }
+ Int16 getCompressionMethod() const { return compressionMethod_; }
+
Lng32 getFlags() { return flags_; }
void setFileIsLocal(NABoolean v)
@@ -64,6 +73,7 @@ class HdfsFileInfo
NABasicPtr fileName_;
Int64 startOffset_;
Int64 bytesToRead_;
+ Int16 compressionMethod_;
};
typedef HdfsFileInfo* HdfsFileInfoPtr;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenFastTransport.cpp b/core/sql/generator/GenFastTransport.cpp
index 75c1e0e..5019485 100644
--- a/core/sql/generator/GenFastTransport.cpp
+++ b/core/sql/generator/GenFastTransport.cpp
@@ -476,8 +476,12 @@ static short ft_codegen(Generator *generator,
replication
);
+ UInt16 hdfsIoByteArraySize = (UInt16)
+ CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB);
+ tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize);
tdb->setSequenceFile(isSequenceFile);
tdb->setHdfsCompressed(CmpCommon::getDefaultNumeric(TRAF_UNLOAD_HDFS_COMPRESS)!=0);
+
if ((hiveNAColArray) &&
(hiveInsertErrMode == 2))
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/generator/GenRelScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp
index 827b94c..0a133b2 100644
--- a/core/sql/generator/GenRelScan.cpp
+++ b/core/sql/generator/GenRelScan.cpp
@@ -1234,7 +1234,8 @@ if (hTabStats->isOrcFile())
if (hdfsBufSizeTesting)
hdfsBufSize = hdfsBufSizeTesting;
}
-
+ UInt16 hdfsIoByteArraySize = (UInt16)
+ CmpCommon::getDefaultNumeric(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB);
UInt32 rangeTailIOSize = (UInt32)
CmpCommon::getDefaultNumeric(HDFS_IO_RANGE_TAIL);
if (rangeTailIOSize == 0)
@@ -1362,7 +1363,7 @@ if (hTabStats->isOrcFile())
hdfsRootDir, modTS, numOfPartLevels, hdfsDirsToCheck
);
-
+ hdfsscan_tdb->setHdfsIoByteArraySize(hdfsIoByteArraySize);
generator->initTdbFields(hdfsscan_tdb);
hdfsscan_tdb->setUseCursorMulti(useCursorMulti);
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/DIFF002.KNOWN
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/DIFF002.KNOWN b/core/sql/regress/hive/DIFF002.KNOWN
new file mode 100644
index 0000000..24c6ed7
--- /dev/null
+++ b/core/sql/regress/hive/DIFF002.KNOWN
@@ -0,0 +1,14 @@
+359,360c359,362
+< (EXPR)
+< ----------
+---
+> *** ERROR[8447] An error occurred during hdfs access. Error Detail: SETUP_HDFS_SCAN java.io.IOException: LZOP compression codec is not configured in Hadoop
+> stackTraceRemoved
+> stackTraceRemoved
+> stackTraceRemoved
+362,364c364
+< 73049
+<
+< --- 1 row(s) selected.
+---
+> --- 0 row(s) selected.
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/regress/hive/FILTER002
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/FILTER002 b/core/sql/regress/hive/FILTER002
new file mode 100755
index 0000000..83f8fbf
--- /dev/null
+++ b/core/sql/regress/hive/FILTER002
@@ -0,0 +1,33 @@
+#! /bin/sh
+# @@@ START COPYRIGHT @@@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+# @@@ END COPYRIGHT @@@
+
+
+fil=$1
+if [ "$fil" = "" ]; then
+ echo "Usage: $0 filename"
+ exit 1
+fi
+
+fil=$1
+sed "
+s/org.trafodion.sql.*/stackTraceRemoved/g
+" $fil
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h
index 0096424..b2b2bb9 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3319,6 +3319,11 @@ enum DefaultConstants
// This enum constant must be the LAST one in the list; it's a count,
// not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
+ // Size of byte[] in java when direct byteBuffer can't be used
+ // Used to read compressed hdfs text files and to write
+ // both compressed and uncompressed hdfs files
+ HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB,
+
__NUM_DEFAULT_ATTRIBUTES
};
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 1c47c54..7e5cfa1 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -1496,6 +1496,7 @@ SDDkwd__(EXE_DIAGNOSTIC_EVENTS, "OFF"),
DDui1__(HDFS_IO_BUFFERSIZE, "65536"),
DDui___(HDFS_IO_BUFFERSIZE_BYTES, "0"),
+ DDui___(HDFS_IO_INTERIM_BYTEARRAY_SIZE_IN_KB, "1024"),
// The value 0 denotes RangeTail = max record length of table.
DDui___(HDFS_IO_RANGE_TAIL, "0"),
DDkwd__(HDFS_PREFETCH, "ON"),
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
index 0346bef..5ffcd03 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HDFSClient.java
@@ -25,6 +25,7 @@ import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.EOFException;
import java.io.OutputStream;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
@@ -38,6 +39,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionInputStream;
import java.io.EOFException;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
@@ -53,31 +55,47 @@ import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
//
// To read a range in a Hdfs file, use the constructor
-// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
+// public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length, CompressionInputStream inStream)
//
// For instance methods like hdfsListDirectory use the constructor
// public HDFSClient()
//
// For all static methods use
-// HdfsClient::<static_method_name>
+// HDFSClient::<static_method_name>
//
public class HDFSClient
{
+ // Keep the constants and string array below in sync with
+ // enum CompressionMethod at sql/comexe/ComCompressionInfo.h
+ static final short UNKNOWN_COMPRESSION = 0;
+ static final short UNCOMPRESSED = 1;
+ static final short LZOP = 5;
+ static final String COMPRESSION_TYPE[] = {
+ "UNKNOWN_COMPRESSION", // unable to determine compression method
+ "UNCOMPRESSED", // file is not compressed
+ "LZO_DEFLATE", // using LZO deflate compression
+ "DEFLATE", // using DEFLATE compression
+ "GZIP", // using GZIP compression
+ "LZOP"}; // using LZOP compression
static Logger logger_ = Logger.getLogger(HDFSClient.class.getName());
private static Configuration config_ = null;
private static ExecutorService executorService_ = null;
private static FileSystem defaultFs_ = null;
+ private static CompressionCodecFactory codecFactory_ = null;
private FileSystem fs_ = null;
private int bufNo_;
private int rangeNo_;
- private FSDataInputStream fsdis_;
+ private FSDataInputStream fsdis_;
+ CompressionInputStream inStream_;
private OutputStream outStream_;
private String filename_;
private ByteBuffer buf_;
+ private byte[] bufArray_;
private int bufLen_;
private int bufOffset_ = 0;
private long pos_ = 0;
@@ -89,7 +107,10 @@ public class HDFSClient
private int isEOF_ = 0;
private int totalBytesWritten_ = 0;
private Path filepath_ = null;
- private boolean compression_;
+ boolean compressed_ = false;
+ private CompressionCodec codec_ = null;
+ private short compressionType_;
+ private short ioByteArraySize_;
static {
String confFile = System.getProperty("trafodion.log4j.configFile");
System.setProperty("trafodion.root", System.getenv("TRAF_HOME"));
@@ -105,6 +126,7 @@ public class HDFSClient
catch (IOException ioe) {
throw new RuntimeException("Exception in HDFSClient static block", ioe);
}
+ codecFactory_ = new CompressionCodecFactory(config_);
System.loadLibrary("executor");
}
@@ -125,6 +147,9 @@ public class HDFSClient
{
int bytesRead;
int totalBytesRead = 0;
+ if (compressed_) {
+ bufArray_ = new byte[ioByteArraySize_ * 1024];
+ } else
if (! buf_.hasArray()) {
try {
fsdis_.seek(pos_);
@@ -135,10 +160,14 @@ public class HDFSClient
}
do
{
- if (buf_.hasArray())
- bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
- else
- bytesRead = fsdis_.read(buf_);
+ if (compressed_) {
+ bytesRead = compressedFileRead(lenRemain_);
+ } else {
+ if (buf_.hasArray())
+ bytesRead = fsdis_.read(pos_, buf_.array(), bufOffset_, lenRemain_);
+ else
+ bytesRead = fsdis_.read(buf_);
+ }
if (bytesRead == -1) {
isEOF_ = 1;
break;
@@ -151,10 +180,38 @@ public class HDFSClient
bufOffset_ += bytesRead;
pos_ += bytesRead;
lenRemain_ -= bytesRead;
- } while (lenRemain_ > 0);
+ } while (lenRemain_ > 0);
return new Integer(totalBytesRead);
}
- }
+ }
+
+ int compressedFileRead(int readLenRemain) throws IOException
+ {
+ int totalReadLen = 0;
+ int readLen;
+ int offset = 0;
+ int retcode;
+
+ int lenRemain = ((readLenRemain > bufArray_.length) ? bufArray_.length : readLenRemain);
+ do
+ {
+ readLen = inStream_.read(bufArray_, offset, lenRemain);
+ if (readLen == -1 || readLen == 0)
+ break;
+ totalReadLen += readLen;
+ offset += readLen;
+ lenRemain -= readLen;
+ } while (lenRemain > 0);
+ if (totalReadLen > 0) {
+ if ((retcode = copyToByteBuffer(buf_, bufOffset_, bufArray_, totalReadLen)) != 0)
+ throw new IOException("Failure to copy to the DirectByteBuffer in the native layer with error code " + retcode);
+ }
+ else
+ totalReadLen = -1;
+ return totalReadLen;
+ }
+
+ native int copyToByteBuffer(ByteBuffer buf, int bufOffset, byte[] bufArray, int copyLen);
public HDFSClient()
{
@@ -166,14 +223,30 @@ public class HDFSClient
// The passed in length can never be more than the size of the buffer
// If the range has a length more than the buffer length, the range is chunked
// in HdfsScan
- public HDFSClient(int bufNo, int rangeNo, String filename, ByteBuffer buffer, long position, int length) throws IOException
+ public HDFSClient(int bufNo, short ioByteArraySize, int rangeNo, String filename, ByteBuffer buffer, long position,
+ int length, short compressionType, CompressionInputStream inStream) throws IOException
{
bufNo_ = bufNo;
rangeNo_ = rangeNo;
filename_ = filename;
+ ioByteArraySize_ = ioByteArraySize;
filepath_ = new Path(filename_);
fs_ = FileSystem.get(filepath_.toUri(),config_);
- fsdis_ = fs_.open(filepath_);
+ compressionType_ = compressionType;
+ inStream_ = inStream;
+ codec_ = codecFactory_.getCodec(filepath_);
+ if (codec_ != null) {
+ compressed_ = true;
+ if (inStream_ == null)
+ inStream_ = codec_.createInputStream(fs_.open(filepath_));
+ }
+ else {
+ if ((compressionType_ != UNCOMPRESSED) && (compressionType_ != UNKNOWN_COMPRESSION))
+ throw new IOException(COMPRESSION_TYPE[compressionType_] + " compression codec is not configured in Hadoop");
+ if (filename_.endsWith(".lzo"))
+ throw new IOException(COMPRESSION_TYPE[LZOP] + " compression codec is not configured in Hadoop");
+ fsdis_ = fs_.open(filepath_);
+ }
blockSize_ = (int)fs_.getDefaultBlockSize(filepath_);
buf_ = buffer;
bufOffset_ = 0;
@@ -201,7 +274,8 @@ public class HDFSClient
int bytesRead;
retObject = (Integer)future_.get();
bytesRead = retObject.intValue();
- fsdis_.close();
+ if (! compressed_)
+ fsdis_.close();
fsdis_ = null;
return bytesRead;
}
@@ -226,7 +300,7 @@ public class HDFSClient
filepath_ = new Path(fname + ".gz");
fs_ = FileSystem.get(filepath_.toUri(),config_);
- compression_ = compress;
+ compressed_ = compress;
fsdis_ = null;
FSDataOutputStream fsOut;
if (overwrite)
@@ -237,7 +311,7 @@ public class HDFSClient
else
fsOut = fs_.create(filepath_);
- if (compression_) {
+ if (compressed_) {
GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_);
Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
@@ -256,7 +330,7 @@ public class HDFSClient
else
filepath_ = new Path(fname + ".gz");
fs_ = FileSystem.get(filepath_.toUri(),config_);
- compression_ = compress;
+ compressed_ = compress;
outStream_ = null;
fsdis_ = null;
return true;
@@ -274,7 +348,7 @@ public class HDFSClient
else
fsOut = fs_.create(filepath_);
- if (compression_) {
+ if (compressed_) {
GzipCodec gzipCodec = (GzipCodec) ReflectionUtils.newInstance( GzipCodec.class, config_);
Compressor gzipCompressor = CodecPool.getCompressor(gzipCodec);
outStream_= gzipCodec.createOutputStream(fsOut, gzipCompressor);
@@ -285,21 +359,23 @@ public class HDFSClient
logger_.debug("HDFSClient.hdfsWrite() - output stream created" );
}
outStream_.write(buff);
- if (outStream_ instanceof FSDataOutputStream)
- totalBytesWritten_ = ((FSDataOutputStream)outStream_).size();
- else
- totalBytesWritten_ += buff.length;
if (logger_.isDebugEnabled())
- logger_.debug("HDFSClient.hdfsWrite() - bytes written " + totalBytesWritten_ );
- return totalBytesWritten_;
+ logger_.debug("HDFSClient.hdfsWrite() - bytes written " + buff.length);
+ return buff.length;
}
int hdfsRead(ByteBuffer buffer) throws IOException
{
if (logger_.isDebugEnabled())
logger_.debug("HDFSClient.hdfsRead() - started" );
- if (fsdis_ == null) {
- fsdis_ = fs_.open(filepath_);
+ if (fsdis_ == null && inStream_ == null ) {
+ codec_ = codecFactory_.getCodec(filepath_);
+ if (codec_ != null) {
+ compressed_ = true;
+ inStream_ = codec_.createInputStream(fs_.open(filepath_));
+ }
+ else
+ fsdis_ = fs_.open(filepath_);
pos_ = 0;
}
int lenRemain;
@@ -307,6 +383,8 @@ public class HDFSClient
int totalBytesRead = 0;
int bufLen;
int bufOffset = 0;
+ if (compressed_ && bufArray_ != null)
+ bufArray_ = new byte[ioByteArraySize_ * 1024];
if (buffer.hasArray())
bufLen = buffer.array().length;
else
@@ -314,10 +392,14 @@ public class HDFSClient
lenRemain = bufLen;
do
{
- if (buffer.hasArray())
- bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain);
- else
- bytesRead = fsdis_.read(buffer);
+ if (compressed_) {
+ bytesRead = compressedFileRead(lenRemain);
+ } else {
+ if (buffer.hasArray())
+ bytesRead = fsdis_.read(pos_, buffer.array(), bufOffset, lenRemain);
+ else
+ bytesRead = fsdis_.read(buffer);
+ }
if (bytesRead == -1 || bytesRead == 0)
break;
totalBytesRead += bytesRead;
http://git-wip-us.apache.org/repos/asf/trafodion/blob/96cab4dd/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
----------------------------------------------------------------------
diff --git a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
index 99f021d..b438fb2 100644
--- a/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
+++ b/core/sql/src/main/java/org/trafodion/sql/HdfsScan.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.io.compress.CompressionInputStream;
import java.net.URI;
// This class implements an efficient mechanism to read hdfs files
@@ -72,6 +73,9 @@ public class HdfsScan
private long currRangeLenRemain_;
private int lastBufCompleted_ = -1;
private boolean scanCompleted_;
+ private CompressionInputStream currInStream_;
+ private short ioByteArraySize_;
+
// Structure to hold the Scan ranges for this HdfsScan instance
//
@@ -82,13 +86,15 @@ public class HdfsScan
long pos_;
long len_;
int tdbRangeNum_;
+ short compressionType_;
- HdfsScanRange(String filename, long pos, long len, int tdbRangeNum)
+ HdfsScanRange(String filename, long pos, long len, int tdbRangeNum, short compressionType)
{
filename_ = filename;
pos_ = pos;
len_ = len;
tdbRangeNum_ = tdbRangeNum;
+ compressionType_ = compressionType;
}
}
@@ -103,7 +109,8 @@ public class HdfsScan
{
}
- public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, String filename[], long pos[], long len[], int rangeNum[]) throws IOException
+ public void setScanRanges(ByteBuffer buf1, ByteBuffer buf2, short ioByteArraySize, String filename[], long pos[],
+ long len[], int rangeNum[], short compressionType[]) throws IOException
{
// Two buffers to hold the data read
buf_ = new ByteBuffer[2];
@@ -111,7 +118,7 @@ public class HdfsScan
buf_[0] = buf1;
buf_[1] = buf2;
-
+ ioByteArraySize_ = ioByteArraySize;
for (int i = 0; i < 2 ; i++) {
if (buf_[i].hasArray())
bufLen_[i] = buf_[i].array().length;
@@ -121,12 +128,13 @@ public class HdfsScan
hdfsClient_ = new HDFSClient[2];
hdfsScanRanges_ = new HdfsScanRange[filename.length];
for (int i = 0; i < filename.length; i++) {
- hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i]);
+ hdfsScanRanges_[i] = new HdfsScanRange(filename[i], pos[i], len[i], rangeNum[i], compressionType[i]);
}
if (hdfsScanRanges_.length > 0) {
currRange_ = 0;
currRangePos_ = hdfsScanRanges_[currRange_].pos_;
currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_;
+ currInStream_ = null;
scheduleHdfsScanRange(0, 0);
}
scanCompleted_ = false;
@@ -146,6 +154,9 @@ public class HdfsScan
currRange_++;
currRangePos_ = hdfsScanRanges_[currRange_].pos_;
currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_;
+ if (currInStream_ != null)
+ currInStream_.close();
+ currInStream_ = null;
}
}
if (currRangeLenRemain_ > bufLen_[bufNo])
@@ -155,7 +166,11 @@ public class HdfsScan
if (! scanCompleted_) {
if (logger_.isDebugEnabled())
logger_.debug(" CurrentRange " + hdfsScanRanges_[currRange_].tdbRangeNum_ + " LenRemain " + currRangeLenRemain_ + " BufNo " + bufNo);
- hdfsClient_[bufNo] = new HDFSClient(bufNo, hdfsScanRanges_[currRange_].tdbRangeNum_, hdfsScanRanges_[currRange_].filename_, buf_[bufNo], currRangePos_, readLength);
+ hdfsClient_[bufNo] = new HDFSClient(bufNo, ioByteArraySize_, hdfsScanRanges_[currRange_].tdbRangeNum_,
+ hdfsScanRanges_[currRange_].filename_,
+ buf_[bufNo], currRangePos_, readLength,
+ hdfsScanRanges_[currRange_].compressionType_, currInStream_);
+
}
}
@@ -188,6 +203,7 @@ public class HdfsScan
bufNo = 0;
rangeNo = hdfsClient_[0].getRangeNo();
isEOF = hdfsClient_[0].isEOF();
+ currInStream_ = hdfsClient_[0].inStream_;
break;
case 0:
// Wait for the read to complete in buffer 1
@@ -195,6 +211,7 @@ public class HdfsScan
bufNo = 1;
rangeNo = hdfsClient_[1].getRangeNo();
isEOF = hdfsClient_[1].isEOF();
+ currInStream_ = hdfsClient_[1].inStream_;
break;
default:
bufNo = -1;
@@ -218,6 +235,9 @@ public class HdfsScan
currRangePos_ = hdfsScanRanges_[currRange_].pos_;
currRangeLenRemain_ = hdfsScanRanges_[currRange_].len_;
bytesRead = 0;
+ if (currInStream_ != null)
+ currInStream_.close();
+ currInStream_ = null;
}
}
switch (lastBufCompleted_)
@@ -278,6 +298,7 @@ public class HdfsScan
long pos[] = new long[file_status.length * split];
long len[] = new long[file_status.length * split];
int range[] = new int[file_status.length * split];
+ short compress[] = new short[file_status.length * split];
for (int i = 0 ; i < file_status.length * split; i++) {
Path filePath = file_status[i].getPath();
long fileLen = file_status[i].getLen();
@@ -293,13 +314,14 @@ public class HdfsScan
range[i] = i;
if (j == (split-1))
len[i] = fileLen - (splitLen *(j));
+ compress[i] = 1; // Uncompressed
System.out.println ("Range " + i + " Pos " + pos[i] + " Length " + len[i]);
i++;
}
}
long time1 = System.currentTimeMillis();
HdfsScan hdfsScan = new HdfsScan();
- hdfsScan.setScanRanges(buf1, buf2, fileName, pos, len, range);
+ hdfsScan.setScanRanges(buf1, buf2, (short)512, fileName, pos, len, range, compress);
int[] retArray;
int bytesCompleted;
ByteBuffer buf;