You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2018/03/02 06:57:20 UTC

[3/4] trafodion git commit: [TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables

[TRAFODION-2917] Refactor Trafodion implementation of hdfs scan for text formatted hive tables

Removed the dependency on libhdfs in Trafodion code in some more portions of the code.

Also, introduced HDFS Scan to use the java layer to read HDFS data providing the following features:
a) Prefetch the data using double buffering concept
b) Avoid unnecessary data copy
c) Ensure that the data read is initiated to be read into the other buffer in java layer, while it is being processed in one buffer in the native side, without any hiccups.

This HDFS Scan is still being tested and hence it is turned off by default.

Also changed the code to obtain millisecond-resolution modification timestamp for HDFS files


Project: http://git-wip-us.apache.org/repos/asf/trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/trafodion/commit/e303b3a0
Tree: http://git-wip-us.apache.org/repos/asf/trafodion/tree/e303b3a0
Diff: http://git-wip-us.apache.org/repos/asf/trafodion/diff/e303b3a0

Branch: refs/heads/master
Commit: e303b3a083154779bcde8a84e1e2abff12d365e8
Parents: ac70660
Author: selvaganesang <se...@esgyn.com>
Authored: Wed Feb 28 22:25:43 2018 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Thu Mar 1 22:30:05 2018 +0000

----------------------------------------------------------------------
 core/sql/cli/Context.cpp                        |   2 +-
 core/sql/cli/Context.h                          |   6 +
 core/sql/comexe/ComTdbBlockingHdfsScan.h        |   1 -
 core/sql/comexe/ComTdbHdfsScan.h                |   8 +-
 core/sql/common/ComUser.cpp                     |   2 +-
 core/sql/common/Platform.h                      |   2 +
 core/sql/executor/ExExeUtil.h                   |   1 -
 core/sql/executor/ExExeUtilGet.cpp              |   5 +-
 core/sql/executor/ExExeUtilGetStats.cpp         |   1 -
 core/sql/executor/ExExeUtilLoad.cpp             |  25 +-
 core/sql/executor/ExFastTransport.cpp           |  47 +-
 core/sql/executor/ExHbaseAccess.cpp             |  11 +-
 core/sql/executor/ExHbaseIUD.cpp                |   2 +-
 core/sql/executor/ExHdfsScan.cpp                | 150 +++----
 core/sql/executor/ExHdfsScan.h                  |   1 +
 core/sql/executor/ExSMCommon.cpp                |   1 +
 core/sql/executor/ExSMCommon.h                  |   3 +-
 core/sql/executor/ExSMGlobals.cpp               |   1 +
 core/sql/executor/ExSMQueue.cpp                 |   2 +-
 core/sql/executor/ExSMReader.cpp                |   1 +
 core/sql/executor/ExSMShortMessage.cpp          |   2 +-
 core/sql/executor/ExSMTaskList.cpp              |   2 +-
 core/sql/executor/HdfsClient_JNI.cpp            | 436 +++++++++++++++----
 core/sql/executor/HdfsClient_JNI.h              |  74 +++-
 core/sql/executor/JavaObjectInterface.cpp       |   3 +-
 core/sql/executor/JavaObjectInterface.h         |  12 +-
 core/sql/executor/OrcFileReader.h               |   2 +-
 core/sql/executor/SequenceFileReader.h          |   4 +-
 core/sql/exp/ExpLOBaccess.cpp                   | 143 ++----
 core/sql/exp/ExpLOBaccess.h                     |   8 -
 core/sql/generator/GenFastTransport.cpp         |   4 +-
 core/sql/generator/GenRelScan.cpp               |  17 +-
 core/sql/nskgmake/tdm_sqlexp/Makefile           |   2 +-
 core/sql/optimizer/HDFSHook.cpp                 |  34 +-
 core/sql/optimizer/HDFSHook.h                   |  16 +-
 core/sql/optimizer/OptimizerSimulator.cpp       |   1 -
 core/sql/optimizer/RelScan.h                    |   3 +-
 core/sql/regress/hive/EXPECTED007               |   2 +-
 core/sql/regress/hive/EXPECTED018               |   2 +-
 core/sql/regress/hive/EXPECTED040               |   8 +-
 .../main/java/org/trafodion/sql/HDFSClient.java | 294 +++++++++++--
 .../main/java/org/trafodion/sql/HdfsScan.java   |  95 ++--
 42 files changed, 971 insertions(+), 465 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/cli/Context.cpp
----------------------------------------------------------------------
diff --git a/core/sql/cli/Context.cpp b/core/sql/cli/Context.cpp
index 38622ae..cdf5909 100644
--- a/core/sql/cli/Context.cpp
+++ b/core/sql/cli/Context.cpp
@@ -60,7 +60,6 @@
 #include "ComUser.h"
 #include "CmpSeabaseDDLauth.h"
 
-#include "hdfs.h"
 #include "StmtCompilationMode.h"
 
 #include "ExCextdecs.h"
@@ -165,6 +164,7 @@ ContextCli::ContextCli(CliGlobals *cliGlobals)
     jniErrorStr_(&exHeap_),
     hbaseClientJNI_(NULL),
     hiveClientJNI_(NULL),
+    hdfsClientJNI_(NULL),
     arkcmpArray_(&exHeap_),
     cmpContextInfo_(&exHeap_),
     cmpContextInUse_(&exHeap_),

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/cli/Context.h
----------------------------------------------------------------------
diff --git a/core/sql/cli/Context.h b/core/sql/cli/Context.h
index bcfd06b..f2da6c8 100644
--- a/core/sql/cli/Context.h
+++ b/core/sql/cli/Context.h
@@ -62,6 +62,7 @@
 #include "ExpSeqGen.h"
 #include "ssmpipc.h"
 #include "hdfs.h"
+#include "HdfsClient_JNI.h"
 
 class CliGlobals;
 class HashQueue;
@@ -200,6 +201,11 @@ public:
   HiveClient_JNI *getHiveClient() { return hiveClientJNI_; }
   void setHiveClient(HiveClient_JNI *hiveClientJNI)
   { hiveClientJNI_ = hiveClientJNI; }
+
+  HdfsClient *getHDFSClient() { return hdfsClientJNI_; }
+  void setHDFSClient(HdfsClient *hdfsClientJNI)
+  { hdfsClientJNI_ = hdfsClientJNI; }
+
   //expose cmpContextInfo_ to get HQC info of different contexts
   const NAArray<CmpContextInfo *> & getCmpContextInfo() const { return cmpContextInfo_; }
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/comexe/ComTdbBlockingHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbBlockingHdfsScan.h b/core/sql/comexe/ComTdbBlockingHdfsScan.h
index 88a3b40..fe0b980 100755
--- a/core/sql/comexe/ComTdbBlockingHdfsScan.h
+++ b/core/sql/comexe/ComTdbBlockingHdfsScan.h
@@ -24,7 +24,6 @@
 #define COM_HDFS_SCAN_H
 
 #include "ComTdb.h"
-#include "hdfs.h"  // tPort 
 
 //
 // Task Definition Block

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/comexe/ComTdbHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHdfsScan.h b/core/sql/comexe/ComTdbHdfsScan.h
index 46d7f2f..ea995fb 100755
--- a/core/sql/comexe/ComTdbHdfsScan.h
+++ b/core/sql/comexe/ComTdbHdfsScan.h
@@ -55,7 +55,8 @@ class ComTdbHdfsScan : public ComTdb
     LOG_ERROR_ROWS              = 0x0040,
     ASSIGN_RANGES_AT_RUNTIME    = 0x0080,
     TREAT_EMPTY_AS_NULL         = 0x0100,
-    USE_LIBHDFS_SCAN            = 0x0200
+    USE_LIBHDFS_SCAN            = 0x0200,
+    COMPRESSED_FILE             = 0x0400
   };
 
   // Expression to filter rows.
@@ -290,6 +291,11 @@ public:
   NABoolean getUseLibhdfsScan() const
                                 { return (flags_ & USE_LIBHDFS_SCAN) != 0; }
 
+  void setCompressedFile(NABoolean v)
+  {(v ? flags_ |= COMPRESSED_FILE : flags_ &= ~COMPRESSED_FILE); }
+  NABoolean isCompressedFile() const
+                                { return (flags_ & COMPRESSED_FILE) != 0; }
+
   UInt32 getMaxErrorRows() const { return maxErrorRows_;}
   void setMaxErrorRows(UInt32 v ) { maxErrorRows_= v; }
   

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/common/ComUser.cpp
----------------------------------------------------------------------
diff --git a/core/sql/common/ComUser.cpp b/core/sql/common/ComUser.cpp
index 796d94b..2b6831f 100644
--- a/core/sql/common/ComUser.cpp
+++ b/core/sql/common/ComUser.cpp
@@ -72,7 +72,7 @@ Int32 ComUser::getCurrentUser(void)
                                NULL, 0, NULL);
 
   assert(rc >= 0);
-  assert(dbUserID >= SUPER_USER)
+  assert(dbUserID >= SUPER_USER);
 
   return dbUserID;
   

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/common/Platform.h
----------------------------------------------------------------------
diff --git a/core/sql/common/Platform.h b/core/sql/common/Platform.h
index fe7598a..153f9b9 100644
--- a/core/sql/common/Platform.h
+++ b/core/sql/common/Platform.h
@@ -35,6 +35,8 @@
  *
  *****************************************************************************
  */
+#define __STDC_LIMIT_MACROS
+#define __STDC_FORMAT_MACROS
 
 /*
 // On Linux, either NA_BIG_ENDIAN or NA_LITTLE_ENDIAN may have already

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExExeUtil.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtil.h b/core/sql/executor/ExExeUtil.h
index e119ed1..85173c8 100644
--- a/core/sql/executor/ExExeUtil.h
+++ b/core/sql/executor/ExExeUtil.h
@@ -3893,7 +3893,6 @@ class ExExeUtilHBaseBulkUnLoadTcb : public ExExeUtilTcb
   NABoolean emptyTarget_;
   NABoolean oneFile_;
   ExpHbaseInterface * ehi_;
-  HdfsClient *hdfsClient_;
 };
 
 class ExExeUtilHbaseUnLoadPrivateState : public ex_tcb_private_state

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExExeUtilGet.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilGet.cpp b/core/sql/executor/ExExeUtilGet.cpp
index cd20fae..ff40c40 100644
--- a/core/sql/executor/ExExeUtilGet.cpp
+++ b/core/sql/executor/ExExeUtilGet.cpp
@@ -69,7 +69,6 @@
 
 #include "ExpHbaseInterface.h"
 #include "sql_buffer_size.h"
-#include "hdfs.h"
 
 #include "NAType.h"
 
@@ -6873,8 +6872,8 @@ ExExeUtilClusterStatsTcb::ExExeUtilClusterStatsTcb(
   stats_ = (ComTdbClusterStatsVirtTableColumnStruct*)statsBuf_;
 
   ehi_ = ExpHbaseInterface::newInstance(glob->getDefaultHeap(),
-					(char*)"", //exe_util_tdb.server(), 
-					(char*)""); //exe_util_tdb.zkPort());
+					(char*)"", 
+					(char*)""); 
   regionInfoList_ = NULL;
   
   // get hbase rootdir location. Max linux pathlength is 1024.

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExExeUtilGetStats.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilGetStats.cpp b/core/sql/executor/ExExeUtilGetStats.cpp
index 72bdd2b..9e99b5c 100644
--- a/core/sql/executor/ExExeUtilGetStats.cpp
+++ b/core/sql/executor/ExExeUtilGetStats.cpp
@@ -44,7 +44,6 @@
 #include  "ComTdb.h"
 #include  "ex_tcb.h"
 #include  "ComSqlId.h"
-
 #include  "ExExeUtil.h"
 #include  "ex_exe_stmt_globals.h"
 #include  "exp_expr.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExExeUtilLoad.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExExeUtilLoad.cpp b/core/sql/executor/ExExeUtilLoad.cpp
index 115f2a4..97c185c 100644
--- a/core/sql/executor/ExExeUtilLoad.cpp
+++ b/core/sql/executor/ExExeUtilLoad.cpp
@@ -1960,7 +1960,7 @@ ex_tcb * ExExeUtilHBaseBulkUnLoadTdb::build(ex_globals * glob)
 void ExExeUtilHBaseBulkUnLoadTcb::createHdfsFileError(Int32 hdfsClientRetCode)
 {
   ComDiagsArea * diagsArea = NULL;
-  char* errorMsg = hdfsClient_->getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
+  char* errorMsg = HdfsClient::getErrorText((HDFS_Client_RetCode)hdfsClientRetCode);
   ExRaiseSqlError(getHeap(), &diagsArea, (ExeErrorCode)(8447), NULL,
                   NULL, NULL, NULL, errorMsg, (char *)GetCliGlobals()->currContext()->getJniErrorStr().data());
   ex_queue_entry *pentry_up = qparent_.up->getTailEntry();
@@ -1980,7 +1980,6 @@ ExExeUtilHBaseBulkUnLoadTcb::ExExeUtilHBaseBulkUnLoadTcb(
        emptyTarget_(FALSE),
        oneFile_(FALSE)
 {
-  hdfsClient_ = NULL;
   ehi_ = ExpHbaseInterface::newInstance(getGlobals()->getDefaultHeap(),
                                    (char*)"", //Later may need to change to hblTdb.server_,
                                    (char*)""); //Later may need to change to hblTdb.zkPort_);
@@ -2005,12 +2004,6 @@ void ExExeUtilHBaseBulkUnLoadTcb::freeResources()
     NADELETEBASIC (snapshotsList_, getMyHeap());
     snapshotsList_ = NULL;
   }
-
-  if (hdfsClient_)
-  {
-    NADELETE(hdfsClient_, HdfsClient, getMyHeap());
-    hdfsClient_ = NULL;
-  }
   NADELETE(ehi_, ExpHbaseInterface, getGlobals()->getDefaultHeap());
   ehi_ = NULL;
 }
@@ -2192,16 +2185,6 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
       }
       setEmptyTarget(hblTdb().getEmptyTarget());
       setOneFile(hblTdb().getOneFile());
-      if (!hdfsClient_)
-      {
-        hdfsClient_ = HdfsClient::newInstance((NAHeap *)getMyHeap(), hdfsClientRetCode);
-        if (hdfsClientRetCode != HDFS_CLIENT_OK)
-        {
-          createHdfsFileError(hdfsClientRetCode);
-          step_ = UNLOAD_END_ERROR_;
-          break;
-        }
-      }
       if ((retcode = ehi_->init(NULL)) != HBASE_ACCESS_SUCCESS)
       {
          ExHbaseAccessTcb::setupError((NAHeap *)getMyHeap(),qparent_, retcode, 
@@ -2213,7 +2196,7 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
       if (!hblTdb().getOverwriteMergeFile() &&  hblTdb().getMergePath() != NULL)
       {
         NABoolean exists = FALSE;
-        hdfsClientRetCode = hdfsClient_->hdfsExists( hblTdb().getMergePath(), exists);
+        hdfsClientRetCode = HdfsClient::hdfsExists( hblTdb().getMergePath(), exists);
         if (hdfsClientRetCode != HDFS_CLIENT_OK)
         {
           createHdfsFileError(hdfsClientRetCode);
@@ -2298,7 +2281,7 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
 
       NAString uldPath ( hblTdb().getExtractLocation());
 
-      hdfsClientRetCode = hdfsClient_->hdfsCleanUnloadPath( uldPath);
+      hdfsClientRetCode = HdfsClient::hdfsCleanUnloadPath( uldPath);
       if (hdfsClientRetCode != HDFS_CLIENT_OK)
       {
         createHdfsFileError(hdfsClientRetCode);
@@ -2443,7 +2426,7 @@ short ExExeUtilHBaseBulkUnLoadTcb::work()
 
       NAString srcPath ( hblTdb().getExtractLocation());
       NAString dstPath ( hblTdb().getMergePath());
-      hdfsClientRetCode = hdfsClient_->hdfsMergeFiles( srcPath, dstPath);
+      hdfsClientRetCode = HdfsClient::hdfsMergeFiles( srcPath, dstPath);
       if (hdfsClientRetCode != HDFS_CLIENT_OK)
       {
         createHdfsFileError(hdfsClientRetCode);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExFastTransport.cpp b/core/sql/executor/ExFastTransport.cpp
index ba4cfbb..d2cbd47 100644
--- a/core/sql/executor/ExFastTransport.cpp
+++ b/core/sql/executor/ExFastTransport.cpp
@@ -790,7 +790,7 @@ ExWorkProcRetcode ExHdfsFastExtractTcb::work()
           }
           else if (!isSequenceFile() && hdfsClient_ == NULL)
           {
-             hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetCode);
+             hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetCode);
              if (hdfsClientRetCode != HDFS_CLIENT_OK)
              {
                 createHdfsClientFileError(hdfsClientRetCode);
@@ -813,25 +813,30 @@ ExWorkProcRetcode ExHdfsFastExtractTcb::work()
           }
           else
           {
-            hdfsClientRetCode = hdfsClient_->hdfsCreate(targetLocation_, isHdfsCompressed());
-            if (hdfsClientRetCode != HDFS_CLIENT_OK)
-            {
-              createHdfsClientFileError(hdfsClientRetCode);
-              pstate.step_ = EXTRACT_ERROR;
-              break;
-            }
-          }  
-          if (feStats)
-          {
-            feStats->setPartitionNumber(fileNum);
-          }
-        }
-      else
-      {
-          updateWorkATPDiagsArea(__FILE__,__LINE__,"sockets are not supported");
-          pstate.step_ = EXTRACT_ERROR;
-          break;
-      }
+             hdfsClientRetCode = hdfsClient_->hdfsOpen(targetLocation_, isHdfsCompressed());
+             if (hdfsClientRetCode != HDFS_CLIENT_OK)
+             {
+                createHdfsClientFileError(hdfsClientRetCode);
+                NADELETE(hdfsClient_,
+                       HdfsClient,
+                       heap_);
+                hdfsClient_ = NULL;
+                pstate.step_ = EXTRACT_ERROR;
+                break;
+             }
+           }
+           if (feStats)
+           {
+             feStats->setPartitionNumber(fileNum);
+           }
+       }
+       else
+       {
+           updateWorkATPDiagsArea(__FILE__,__LINE__,"sockets are not supported");
+           pstate.step_ = EXTRACT_ERROR;
+           break;
+       }
+
       for (UInt32 i = 0; i < myTdb().getChildTuple()->numAttrs(); i++)
       {
         Attributes * attr = myTdb().getChildTableAttr(i);
@@ -1041,7 +1046,7 @@ ExWorkProcRetcode ExHdfsFastExtractTcb::work()
         }
       else
         {
-          hdfsClientRetCode = hdfsClient_->hdfsWrite(currBuffer_->data_, bytesToWrite);
+          hdfsClient_->hdfsWrite(currBuffer_->data_, bytesToWrite, hdfsClientRetCode);
           if (hdfsClientRetCode != HDFS_CLIENT_OK)
           {
             createSequenceFileError(hdfsClientRetCode);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExHbaseAccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp
index 2182f9a..8af9308 100644
--- a/core/sql/executor/ExHbaseAccess.cpp
+++ b/core/sql/executor/ExHbaseAccess.cpp
@@ -2991,7 +2991,8 @@ short ExHbaseAccessTcb::createDirectRowBuffer( UInt16 tuppIndex,
     {
       // Overwrite trailing delimiter with newline.
       hiveBuff[hiveBuffInx-1] = '\n';
-      sampleFileHdfsClient()->hdfsWrite(hiveBuff, hiveBuffInx);
+      HDFS_Client_RetCode hdfsClientRetcode;
+      sampleFileHdfsClient()->hdfsWrite(hiveBuff, hiveBuffInx, hdfsClientRetcode);
     }
   return 0;
 }
@@ -3263,16 +3264,16 @@ void ExHbaseAccessTcb::handleException(NAHeap *heap,
      return;
 
   if (!loggingFileCreated_) {
-     logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode);
+     logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode);
      if (hdfsClientRetcode == HDFS_CLIENT_OK)
-        hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, FALSE);
+        hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, TRUE, FALSE);
      if (hdfsClientRetcode == HDFS_CLIENT_OK)
         loggingFileCreated_ = TRUE;
      else 
         goto logErrorReturn;
   }
   
-  hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen);
+  logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen, hdfsClientRetcode);
   if (hdfsClientRetcode != HDFS_CLIENT_OK) 
      goto logErrorReturn;
   if (errorCond != NULL) {
@@ -3288,7 +3289,7 @@ void ExHbaseAccessTcb::handleException(NAHeap *heap,
      errorMsg = (char *)"[UNKNOWN EXCEPTION]\n";
      errorMsgLen = strlen(errorMsg);
   }
-  hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen);
+  logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen, hdfsClientRetcode);
 logErrorReturn:
   if (hdfsClientRetcode != HDFS_CLIENT_OK) {
      loggingErrorDiags_ = ComDiagsArea::allocate(heap);

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index e8aac8c..6a52457 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -1421,7 +1421,7 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work()
             HDFS_Client_RetCode hdfsClientRetcode;
             samplePath.append(filePart);
             if (sampleFileHdfsClient_ == NULL)
-                sampleFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode);
+                sampleFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode);
             if (hdfsClientRetcode == HDFS_CLIENT_OK) {
                 hdfsClientRetcode = sampleFileHdfsClient_->hdfsOpen(samplePath.data(), FALSE);
                 if (hdfsClientRetcode != HDFS_CLIENT_OK) {

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExHdfsScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.cpp b/core/sql/executor/ExHdfsScan.cpp
index 3ff153e..a56cb06 100644
--- a/core/sql/executor/ExHdfsScan.cpp
+++ b/core/sql/executor/ExHdfsScan.cpp
@@ -124,11 +124,12 @@ ExHdfsScanTcb::ExHdfsScanTcb(
   , hdfsScan_(NULL)
   , hdfsStats_(NULL)
   , hdfsFileInfoListAsArray_(glob->getDefaultHeap(), hdfsScanTdb.getHdfsFileInfoList()->numEntries())
+  
 {
   Space * space = (glob ? glob->getSpace() : 0);
   CollHeap * heap = (glob ? glob->getDefaultHeap() : 0);
   useLibhdfsScan_ = hdfsScanTdb.getUseLibhdfsScan();
-  if (isSequenceFile())
+  if (isSequenceFile() || hdfsScanTdb.isCompressedFile())
      useLibhdfsScan_ = TRUE;
   lobGlob_ = NULL;
   hdfsScanBufMaxSize_ = hdfsScanTdb.hdfsBufSize_;
@@ -229,7 +230,7 @@ ExHdfsScanTcb::ExHdfsScanTcb(
                                         (char*)"");
   ex_assert(ehi_ != NULL, "Internal error: ehi_ is null in ExHdfsScan");
   HDFS_Client_RetCode hdfsClientRetcode;
-  hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode);
+  hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode);
   ex_assert(hdfsClientRetcode == HDFS_CLIENT_OK, "Internal error: HdfsClient::newInstance returned an error"); 
   // Populate the hdfsInfo list into an array to gain o(1) lookup access
   Queue* hdfsInfoList = hdfsScanTdb.getHdfsFileInfoList();
@@ -419,19 +420,23 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
   while (!qparent_.down->isEmpty())
     {
       ex_queue_entry *pentry_down = qparent_.down->getHeadEntry();
+      if (pentry_down->downState.request == ex_queue::GET_NOMORE && step_ != DONE) 
+      {
+          if (! useLibhdfsScan_)
+             step_ = STOP_HDFS_SCAN;
+      }
       switch (step_)
 	{
 	case NOT_STARTED:
 	  {
 	    matches_ = 0;
-	    
 	    beginRangeNum_ = -1;
 	    numRanges_ = -1;
 	    hdfsOffset_ = 0;
             checkRangeDelimiter_ = FALSE;
-
+            if (getStatsEntry())
+               hdfsStats_ = getStatsEntry()->castToExHdfsScanStats();
             dataModCheckDone_ = FALSE;
-
 	    myInstNum_ = getGlobals()->getMyInstanceNumber();
 	    hdfsScanBufMaxSize_ = hdfsScanTdb().hdfsBufSize_;
 
@@ -556,11 +561,14 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
           {   
              if (hdfsScan_ != NULL)
                 NADELETE(hdfsScan_, HdfsScan, getHeap());
+             if (hdfsFileInfoListAsArray_.entries() == 0) {
+                step_ = DONE;
+                break;
+             } 
              hdfsScan_ = HdfsScan::newInstance((NAHeap *)getHeap(), hdfsScanBuf_, hdfsScanBufMaxSize_, 
                             &hdfsFileInfoListAsArray_, beginRangeNum_, numRanges_, hdfsScanTdb().rangeTailIOSize_, 
                             hdfsStats_, hdfsScanRetCode);
-             if (hdfsScanRetCode != HDFS_SCAN_OK)
-             {
+             if (hdfsScanRetCode != HDFS_SCAN_OK) {
                 setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "SETUP_HDFS_SCAN", 
                               currContext->getJniErrorStr(), NULL);              
                 step_ = HANDLE_ERROR_AND_DONE;
@@ -579,7 +587,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
           break;
         case TRAF_HDFS_READ:
           {
-             hdfsScanRetCode = hdfsScan_->trafHdfsRead((NAHeap *)getHeap(), hdfsStats_, retArray_, sizeof(retArray_)/sizeof(int));
+             hdfsScanRetCode = hdfsScan_->trafHdfsRead(retArray_, sizeof(retArray_)/sizeof(int));
              if (hdfsScanRetCode == HDFS_SCAN_EOR) {
                 step_ = DONE;
                 break;
@@ -600,6 +608,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 else
                    recordSkip_ = TRUE; 
              } else {
+                // Throw away the rest of the data when done with the current range
+                if (currRangeBytesRead_ > hdfo->getBytesToRead()) {
+                   step_ = TRAF_HDFS_READ;
+                   break;
+                }
                 currRangeBytesRead_ += retArray_[BYTES_COMPLETED];
                 bufBegin_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ - headRoomCopied_;
                 recordSkip_ = FALSE;
@@ -608,10 +621,14 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                 extraBytesRead_ = currRangeBytesRead_ - hdfo->getBytesToRead(); 
              else
                 extraBytesRead_ = 0;
-             // headRoom_ is the number of extra bytes read (rangeTailIOSize)
+             // headRoom_ is the number of extra bytes to be read (rangeTailIOSize)
              // If EOF is reached while reading the range and the extraBytes read
-             // is less than headRoom_, then process all the data till EOF 
-             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_)
+             // is less than headRoom_ then process all the data till EOF 
+             // TODO: If the whole range fits in one buffer, it is need too to process rows till EOF for the last range alone
+             // No easy way to identify that last range read, but can identify that it is not the first range. 
+             // The rows could be read more than once if there are more than 2 ranges.
+             // Fix optimizer not to have more than 2 ranges in that case 
+             if (retArray_[IS_EOF] && extraBytesRead_ < headRoom_ && hdfo->getStartOffset() != 0)
                 extraBytesRead_ = 0;
              bufLogicalEnd_ = hdfsScanBuf_[retArray_[BUF_NO]].buf_ + retArray_[BYTES_COMPLETED] - extraBytesRead_;
              prevRangeNum_ = retArray_[RANGE_NO];
@@ -628,6 +645,8 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
                    step_ = HANDLE_ERROR_AND_DONE;
                    break;
                 }
+		//add changedLen since hdfs_strchr will remove the pointer ahead to remove the \r
+		hdfsBufNextRow_ += 1 + changedLen;   // point past record delimiter.
              }
              else
                 hdfsBufNextRow_ = (char *)bufBegin_; 
@@ -646,6 +665,17 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
              step_ = TRAF_HDFS_READ;  
           }
           break;
+        case STOP_HDFS_SCAN:
+          {
+             hdfsScanRetCode = hdfsScan_->stop();
+             if (hdfsScanRetCode != HDFS_SCAN_OK) {
+                setupError(EXE_ERROR_HDFS_SCAN, hdfsScanRetCode, "HdfsScan::stop", 
+                              currContext->getJniErrorStr(), NULL);              
+                step_ = HANDLE_ERROR_AND_DONE;
+             }    
+             step_ = DONE;
+          }
+          break;
 	case INIT_HDFS_CURSOR:
 	  {
             hdfo_ = getRange(currRangeNum_);
@@ -667,6 +697,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
             sprintf(cursorId_, "%d", currRangeNum_);
             stopOffset_ = hdfsOffset_ + hdfo_->getBytesToRead();
 
+
 	    step_ = OPEN_HDFS_CURSOR;
 	  }
         
@@ -1076,16 +1107,19 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 
 	  if (startOfNextRow == NULL)
 	  {
-            if (useLibhdfsScan_)
+            if (useLibhdfsScan_) 
 	       step_ = REPOS_HDFS_DATA;
             else {
-               if (retArray_[IS_EOF]) 
+               if (retArray_[IS_EOF]) { 
+                  headRoomCopied_ = 0; 
                   step_ = TRAF_HDFS_READ;
+               }
                else
                   step_ = COPY_TAIL_TO_HEAD;
             }
+            // Looks like we can break always
 	    if (!exception_)
-	      break;
+	       break;
 	  }
 	  else
 	  {
@@ -1096,13 +1130,11 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
              } 
              else {
                 if ((BYTE *)startOfNextRow > bufLogicalEnd_) {
-                   step_ = TRAF_HDFS_READ;
+                   headRoomCopied_ = 0;
                    hdfsBufNextRow_ = NULL;
-                }
-                else
+                } else 
 	          hdfsBufNextRow_ = startOfNextRow;
-            }
-           
+             }
 	  }
            
 	  if (exception_)
@@ -1304,7 +1336,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	          if ((pentry_down->downState.request == ex_queue::GET_N) &&
 	              (pentry_down->downState.requestValue == matches_)) {
                      if (useLibhdfsScan_)
-                        step_ = CLOSE_FILE;
+                        step_ = CLOSE_HDFS_CURSOR;
                      else
                         step_ = DONE;
                   }
@@ -1361,8 +1393,7 @@ ExWorkProcRetcode ExHdfsScanTcb::work()
 	            up_entry->getTupp(hdfsScanTdb().tuppIndex_).getDataPointer());
 	      }
 	    }
-	  }
-
+          }
 	  up_entry->upState.setMatchNo(++matches_);
 	  if (matches_ == matchBrkPoint_)
 	    brkpoint();
@@ -1744,17 +1775,18 @@ char * ExHdfsScanTcb::extractAndTransformAsciiSourceToSqlRow(int &err,
   {
      sourceRowEnd = hdfs_strchr(sourceData, rd, sourceDataEnd, checkRangeDelimiter_, mode, &changedLen);
      hdfsLoggingRowEnd_  = sourceRowEnd + changedLen;
-
+     
      if (sourceRowEnd == NULL)
         return NULL; 
-     if ((endOfRequestedRange) && 
+     if (useLibhdfsScan_) {
+        if ((endOfRequestedRange) && 
             (sourceRowEnd >= endOfRequestedRange)) {
-        checkRangeDelimiter_ = TRUE;
-        *(sourceRowEnd +1)= RANGE_DELIMITER;
+           checkRangeDelimiter_ = TRUE;
+           *(sourceRowEnd +1)= RANGE_DELIMITER;
+        }
      }
-
-    // no columns need to be converted. For e.g. count(*) with no predicate
-    return sourceRowEnd+1;
+     // no columns need to be converted. For e.g. count(*) with no predicate
+     return sourceRowEnd+1;
   }
 
   Lng32 neededColIndex = 0;
@@ -2016,54 +2048,6 @@ void ExHdfsScanTcb::deallocateRuntimeRanges()
     }
 }
 
-void ExHdfsScanTcb::handleException(NAHeap *heap,
-                                    char *logErrorRow,
-                                    Lng32 logErrorRowLen,
-                                    ComCondition *errorCond)
-{
-  Lng32 errorMsgLen = 0;
-  charBuf *cBuf = NULL;
-  char *errorMsg;
-  HDFS_Client_RetCode hdfsClientRetcode;
-
-  if (loggingErrorDiags_ != NULL)
-     return;
-
-  if (!loggingFileCreated_) {
-     hdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode);
-     if (hdfsClientRetcode == HDFS_CLIENT_OK)
-        hdfsClientRetcode = hdfsClient_->hdfsCreate(loggingFileName_, FALSE);
-     if (hdfsClientRetcode == HDFS_CLIENT_OK)
-        loggingFileCreated_ = TRUE;
-     else 
-        goto logErrorReturn;
-  }
-  hdfsClientRetcode = hdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen);
-  if (hdfsClientRetcode != HDFS_CLIENT_OK) 
-     goto logErrorReturn;
-  if (errorCond != NULL) {
-     errorMsgLen = errorCond->getMessageLength();
-     const NAWcharBuf wBuf((NAWchar*)errorCond->getMessageText(), errorMsgLen, heap);
-     cBuf = unicodeToISO88591(wBuf, heap, cBuf);
-     errorMsg = (char *)cBuf->data();
-     errorMsgLen = cBuf -> getStrLen();
-     errorMsg[errorMsgLen]='\n';
-     errorMsgLen++;
-  }
-  else {
-     errorMsg = (char *)"[UNKNOWN EXCEPTION]\n";
-     errorMsgLen = strlen(errorMsg);
-  }
-  hdfsClientRetcode = hdfsClient_->hdfsWrite(errorMsg, errorMsgLen);
-logErrorReturn:
-  if (hdfsClientRetcode != HDFS_CLIENT_OK) {
-     loggingErrorDiags_ = ComDiagsArea::allocate(heap);
-     *loggingErrorDiags_ << DgSqlCode(EXE_ERROR_WHILE_LOGGING)
-                 << DgString0(loggingFileName_)
-                 << DgString1((char *)GetCliGlobals()->currContext()->getJniErrorStr().data());
-  }
-}
-
 short ExHdfsScanTcb::moveRowToUpQueue(const char * row, Lng32 len, 
                                       short * rc, NABoolean isVarchar)
 {
@@ -2164,7 +2148,7 @@ short ExHdfsScanTcb::handleDone(ExWorkProcRetcode &rc)
 
   return 0;
 }
-/*
+
 void ExHdfsScanTcb::handleException(NAHeap *heap,
                                     char *logErrorRow,
                                     Lng32 logErrorRowLen,
@@ -2179,15 +2163,15 @@ void ExHdfsScanTcb::handleException(NAHeap *heap,
      return;
 
   if (!loggingFileCreated_) {
-     logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), hdfsClientRetcode);
+     logFileHdfsClient_ = HdfsClient::newInstance((NAHeap *)getHeap(), NULL, hdfsClientRetcode);
      if (hdfsClientRetcode == HDFS_CLIENT_OK)
-        hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, FALSE);
+        hdfsClientRetcode = logFileHdfsClient_->hdfsCreate(loggingFileName_, TRUE, FALSE);
      if (hdfsClientRetcode == HDFS_CLIENT_OK)
         loggingFileCreated_ = TRUE;
      else 
         goto logErrorReturn;
   }
-  hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen);
+  logFileHdfsClient_->hdfsWrite(logErrorRow, logErrorRowLen, hdfsClientRetcode);
   if (hdfsClientRetcode != HDFS_CLIENT_OK) 
      goto logErrorReturn;
   if (errorCond != NULL) {
@@ -2203,7 +2187,7 @@ void ExHdfsScanTcb::handleException(NAHeap *heap,
      errorMsg = (char *)"[UNKNOWN EXCEPTION]\n";
      errorMsgLen = strlen(errorMsg);
   }
-  hdfsClientRetcode = logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen);
+  logFileHdfsClient_->hdfsWrite(errorMsg, errorMsgLen, hdfsClientRetcode);
 logErrorReturn:
   if (hdfsClientRetcode != HDFS_CLIENT_OK) {
      loggingErrorDiags_ = ComDiagsArea::allocate(heap);
@@ -2212,7 +2196,7 @@ logErrorReturn:
                  << DgString1((char *)GetCliGlobals()->currContext()->getJniErrorStr().data());
   }
 }
-*/
+
 
 ////////////////////////////////////////////////////////////////////////
 // ORC files

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExHdfsScan.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHdfsScan.h b/core/sql/executor/ExHdfsScan.h
index 04b632e..aa01b6e 100644
--- a/core/sql/executor/ExHdfsScan.h
+++ b/core/sql/executor/ExHdfsScan.h
@@ -227,6 +227,7 @@ protected:
   , SETUP_HDFS_SCAN
   , TRAF_HDFS_READ
   , COPY_TAIL_TO_HEAD
+  , STOP_HDFS_SCAN
   } step_,nextStep_;
 
   /////////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMCommon.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMCommon.cpp b/core/sql/executor/ExSMCommon.cpp
index fde847f..d946732 100644
--- a/core/sql/executor/ExSMCommon.cpp
+++ b/core/sql/executor/ExSMCommon.cpp
@@ -29,6 +29,7 @@
 #include <sys/syscall.h>
 #include <unistd.h>
 #include <pthread.h>
+#include "Platform.h"
 #include "ExSMCommon.h"
 #include "ExSMGlobals.h"
 #include "ExSMEvent.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMCommon.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMCommon.h b/core/sql/executor/ExSMCommon.h
index 1cfe3e3..0a23d68 100644
--- a/core/sql/executor/ExSMCommon.h
+++ b/core/sql/executor/ExSMCommon.h
@@ -23,11 +23,10 @@
 #ifndef EXSM_COMMON_H
 #define EXSM_COMMON_H
 
-#define __STDC_LIMIT_MACROS 
-#define __STDC_FORMAT_MACROS 
 #include <stdint.h>
 #include <inttypes.h>
 #include <stdio.h>
+#include "Platform.h"
 #include "sm.h"
 #include "NAAssert.h"
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMGlobals.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMGlobals.cpp b/core/sql/executor/ExSMGlobals.cpp
index 45dda94..fb205dc 100644
--- a/core/sql/executor/ExSMGlobals.cpp
+++ b/core/sql/executor/ExSMGlobals.cpp
@@ -26,6 +26,7 @@
 #include <signal.h>
 #include <errno.h>
 #include "seabed/pctl.h"
+#include "Platform.h"
 #include "ExSMGlobals.h"
 #include "ExSMTask.h"
 #include "ExSMReader.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMQueue.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMQueue.cpp b/core/sql/executor/ExSMQueue.cpp
index ca1f857..7eac848 100644
--- a/core/sql/executor/ExSMQueue.cpp
+++ b/core/sql/executor/ExSMQueue.cpp
@@ -20,7 +20,7 @@
 //
 // @@@ END COPYRIGHT @@@
 **********************************************************************/
-
+#include "Platform.h"
 #include "ExSMQueue.h"
 #include "NAMemory.h"
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMReader.cpp b/core/sql/executor/ExSMReader.cpp
index f59cbc7..315be70 100644
--- a/core/sql/executor/ExSMReader.cpp
+++ b/core/sql/executor/ExSMReader.cpp
@@ -23,6 +23,7 @@
 
 #include <unistd.h>
 #include <time.h>
+#include "Platform.h"
 #include "seabed/pctl.h"
 #include "seabed/pevents.h"
 #include "ExSMReader.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMShortMessage.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMShortMessage.cpp b/core/sql/executor/ExSMShortMessage.cpp
index b564605..c2a24ef 100644
--- a/core/sql/executor/ExSMShortMessage.cpp
+++ b/core/sql/executor/ExSMShortMessage.cpp
@@ -20,7 +20,7 @@
 //
 // @@@ END COPYRIGHT @@@
 **********************************************************************/
-
+#include "Platform.h"
 #include "ExSMShortMessage.h"
 #include "ExSMGlobals.h"
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/ExSMTaskList.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExSMTaskList.cpp b/core/sql/executor/ExSMTaskList.cpp
index 698c63b..58e3c7c 100644
--- a/core/sql/executor/ExSMTaskList.cpp
+++ b/core/sql/executor/ExSMTaskList.cpp
@@ -20,7 +20,7 @@
 //
 // @@@ END COPYRIGHT @@@
 **********************************************************************/
-
+#include "Platform.h"
 #include "ExSMTaskList.h"
 #include "ExSMTask.h"
 #include "ExSMTrace.h"

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/HdfsClient_JNI.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.cpp b/core/sql/executor/HdfsClient_JNI.cpp
index 5f0f810..d5eb4ec 100644
--- a/core/sql/executor/HdfsClient_JNI.cpp
+++ b/core/sql/executor/HdfsClient_JNI.cpp
@@ -39,16 +39,30 @@ pthread_mutex_t HdfsScan::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
 static const char* const hdfsScanErrorEnumStr[] = 
 {
    "Error in HdfsScan::setScanRanges"
-  ,"Java Exception in HdfsScan::setScanRanges"
+  ,"Java exception in HdfsScan::setScanRanges"
   ,"Error in HdfsScan::trafHdfsRead"
-  ,"Java Exceptiokn in HdfsScan::trafHdfsRead"
+  ,"Java exception in HdfsScan::trafHdfsRead"
   , "Hdfs scan End of Ranges"
+  ,"Error in HdfsScan::stop"
+  ,"Java exception in HdfsScan::stop"
 };
 
  
 //////////////////////////////////////////////////////////////////////////////
 // 
 //////////////////////////////////////////////////////////////////////////////
+HdfsScan::~HdfsScan()
+{
+   if (j_buf1_ != NULL) {
+      jenv_->DeleteGlobalRef(j_buf1_);
+      j_buf1_ = NULL;
+   }
+   if (j_buf2_ != NULL) {
+      jenv_->DeleteGlobalRef(j_buf2_);
+      j_buf2_ = NULL;
+   }
+}
+
 HDFS_Scan_RetCode HdfsScan::init()
 {
   static char className[]="org/trafodion/sql/HdfsScan";
@@ -72,6 +86,8 @@ HDFS_Scan_RetCode HdfsScan::init()
     JavaMethods_[JM_SET_SCAN_RANGES].jm_signature = "(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;[Ljava/lang/String;[J[J[I)V";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_name      = "trafHdfsRead";
     JavaMethods_[JM_TRAF_HDFS_READ].jm_signature = "()[I";
+    JavaMethods_[JM_STOP].jm_name      = "stop";
+    JavaMethods_[JM_STOP].jm_signature = "()V";
    
     rc = (HDFS_Scan_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
     if (rc == HDFS_SCAN_OK)
@@ -90,9 +106,8 @@ char* HdfsScan::getErrorText(HDFS_Scan_RetCode errEnum)
 }
 
 /////////////////////////////////////////////////////////////////////////////
-HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int scanBufSize,
-      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
-      ExHdfsScanStats *hdfsStats)
+HDFS_Scan_RetCode HdfsScan::setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf,  int scanBufSize,
+      HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::setScanRanges() called.");
 
@@ -105,13 +120,24 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN
       jenv_->PopLocalFrame(NULL);
       return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
    }
-
+   j_buf1_ = jenv_->NewGlobalRef(j_buf1);
+   if (j_buf1_ == NULL) {
+      GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM));
+      jenv_->PopLocalFrame(NULL);
+      return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+   }
    jobject j_buf2 = jenv_->NewDirectByteBuffer(hdfsScanBuf[1].buf_, scanBufSize);
    if (j_buf2 == NULL) {
       GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM));
       jenv_->PopLocalFrame(NULL);
       return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
    }
+   j_buf2_ = jenv_->NewGlobalRef(j_buf2);
+   if (j_buf2_ == NULL) {
+      GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM));
+      jenv_->PopLocalFrame(NULL);
+      return HDFS_SCAN_ERROR_SET_SCAN_RANGES_PARAM;
+   }
    jobjectArray j_filenames = NULL;
    jlongArray j_offsets = NULL;
    jlongArray j_lens = NULL;  
@@ -172,13 +198,13 @@ HDFS_Scan_RetCode HdfsScan::setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN
        jenv_->SetIntArrayRegion(j_rangenums, rangeCount, 1, &tdbRangeNum);
    } 
 
-   if (hdfsStats)
-       hdfsStats->getHdfsTimer().start();
+   if (hdfsStats_ != NULL)
+       hdfsStats_->getHdfsTimer().start();
    tsRecentJMFromJNI = JavaMethods_[JM_SET_SCAN_RANGES].jm_full_name;
    jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_SET_SCAN_RANGES].methodID, j_buf1, j_buf2, j_filenames, j_offsets, j_lens, j_rangenums);
-   if (hdfsStats) {
-      hdfsStats->incMaxHdfsIOTime(hdfsStats->getHdfsTimer().stop());
-      hdfsStats->incHdfsCalls();
+   if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
    }
 
    if (jenv_->ExceptionCheck()) {
@@ -204,9 +230,11 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs
    if (hdfsScan != NULL) {
        hdfsScanRetCode = hdfsScan->init();
        if (hdfsScanRetCode == HDFS_SCAN_OK) 
-          hdfsScanRetCode = hdfsScan->setScanRanges(heap, hdfsScanBuf, scanBufSize, 
-                    hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize, hdfsStats); 
-       if (hdfsScanRetCode != HDFS_SCAN_OK) {
+          hdfsScanRetCode = hdfsScan->setScanRanges(hdfsScanBuf, scanBufSize, 
+                    hdfsFileInfoArray, beginRangeNum, numRanges, rangeTailIOSize); 
+       if (hdfsScanRetCode == HDFS_SCAN_OK) 
+          hdfsScan->setHdfsStats(hdfsStats);
+       else {
           NADELETE(hdfsScan, HdfsScan, heap);
           hdfsScan = NULL;
        }
@@ -215,20 +243,20 @@ HdfsScan *HdfsScan::newInstance(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfs
 }
 
 
-HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStats, int retArray[], short arrayLen)
+HDFS_Scan_RetCode HdfsScan::trafHdfsRead(int retArray[], short arrayLen)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::trafHdfsRead() called.");
 
    if (initJNIEnv() != JOI_OK)
      return HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM;
 
-   if (hdfsStats)
-       hdfsStats->getHdfsTimer().start();
+   if (hdfsStats_ != NULL)
+       hdfsStats_->getHdfsTimer().start();
    tsRecentJMFromJNI = JavaMethods_[JM_TRAF_HDFS_READ].jm_full_name;
    jintArray j_retArray = (jintArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_TRAF_HDFS_READ].methodID);
-   if (hdfsStats) {
-      hdfsStats->incMaxHdfsIOTime(hdfsStats->getHdfsTimer().stop());
-      hdfsStats->incHdfsCalls();
+   if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
    }
 
    if (jenv_->ExceptionCheck()) {
@@ -247,6 +275,32 @@ HDFS_Scan_RetCode HdfsScan::trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStat
    return HDFS_SCAN_OK;
 }
 
+HDFS_Scan_RetCode HdfsScan::stop()
+{
+   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsScan::stop() called.");
+
+   if (initJNIEnv() != JOI_OK)
+     return HDFS_SCAN_ERROR_STOP_PARAM;
+
+   if (hdfsStats_ != NULL)
+       hdfsStats_->getHdfsTimer().start();
+   tsRecentJMFromJNI = JavaMethods_[JM_STOP].jm_full_name;
+   jenv_->CallVoidMethod(javaObj_, JavaMethods_[JM_STOP].methodID);
+   if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+   }
+
+   if (jenv_->ExceptionCheck()) {
+      getExceptionDetails();
+      logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+      logError(CAT_SQL_HDFS, "HdfsScan::stop()", getLastError());
+      jenv_->PopLocalFrame(NULL);
+      return HDFS_SCAN_ERROR_STOP_EXCEPTION;
+   }
+   return HDFS_SCAN_OK;
+}
+
 // ===========================================================================
 // ===== Class HdfsClient
 // ===========================================================================
@@ -258,24 +312,33 @@ pthread_mutex_t HdfsClient::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
 
 static const char* const hdfsClientErrorEnumStr[] = 
 {
-  "JNI NewStringUTF() in hdfsCreate()."
- ,"Java exception in hdfsCreate()."
- ,"JNI NewStringUTF() in hdfsOpen()."
- ,"Java exception in hdfsOpen()."
- ,"JNI NewStringUTF() in hdfsWrite()."
- ,"Java exception in hdfsWrite()."
- ,"Java exception in hdfsClose()."
- ,"JNI NewStringUTF() in hdfsMergeFiles()."
- ,"Java exception in hdfsMergeFiles()."
- ,"JNI NewStringUTF() in hdfsCleanUnloadPath()."
- ,"Java exception in hdfsCleanUnloadPath()."
- ,"JNI NewStringUTF() in hdfsExists()."
- ,"Java exception in hdfsExists()."
- ,"JNI NewStringUTF() in hdfsDeletePath()."
- ,"Java exception in hdfsDeletePath()."
- ,"Error in setHdfsFileInfo()."
- ,"Error in hdfsListDirectory()."
- ,"Java exception in hdfsListDirectory()."
+  "JNI NewStringUTF() in HdfsClient::hdfsCreate()."
+ ,"Java exception in HdfsClient::hdfsCreate()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsOpen()."
+ ,"Java exception in HdfsClient::hdfsOpen()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsWrite()."
+ ,"Java exception in HdfsClient::hdfsWrite()."
+ ,"Error in HdfsClient::hdfsRead()."
+ ,"Java exception in HdfsClient::hdfsRead()."
+ ,"Java exception in HdfsClient::hdfsClose()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsMergeFiles()."
+ ,"Java exception in HdfsClient::hdfsMergeFiles()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsCleanUnloadPath()."
+ ,"Java exception in HdfsClient::hdfsCleanUnloadPath()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsExists()."
+ ,"Java exception in HdfsClient::hdfsExists()."
+ ,"JNI NewStringUTF() in HdfsClient::hdfsDeletePath()."
+ ,"Java exception in HdfsClient::hdfsDeletePath()."
+ ,"Error in HdfsClient::setHdfsFileInfo()."
+ ,"Error in HdfsClient::hdfsListDirectory()."
+ ,"Java exception in HdfsClient::hdfsListDirectory()."
+ ,"preparing parameters for HdfsClient::getHiveTableMaxModificationTs()."
+ ,"java exception in HdfsClient::getHiveTableMaxModificationTs()."
+ ,"Error in HdfsClient::getFsDefaultName()."
+ ,"Java exception in HdfsClient::getFsDefaultName()."
+ ,"Buffer is small in HdfsClient::getFsDefaultName()."
+ ,"Error in HdfsClient::hdfsCreateDirectory()."
+ ,"Java exception in HdfsClient::hdfsCreateDirectory()."
 };
 
 //////////////////////////////////////////////////////////////////////////////
@@ -285,9 +348,24 @@ HdfsClient::~HdfsClient()
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::~HdfsClient() called.");
    deleteHdfsFileInfo();
+   if (path_ != NULL) 
+      NADELETEBASIC(path_, getHeap());
+}
+
+void HdfsClient::deleteHdfsFileInfo()
+{
+   for (int i = 0; i < numFiles_ ; i ++) {
+      NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap());
+      NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap());
+      NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap());
+   }
+   if (hdfsFileInfo_ != NULL)
+      NADELETEBASICARRAY(hdfsFileInfo_, getHeap()); 
+   numFiles_ = 0;
+   hdfsFileInfo_ = NULL;
 }
 
-HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode)
+HdfsClient *HdfsClient::newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode)
 {
    QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::newInstance() called.");
 
@@ -297,7 +375,9 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode)
    HdfsClient *hdfsClient = new (heap) HdfsClient(heap);
    if (hdfsClient != NULL) {
        retCode = hdfsClient->init();
-       if (retCode != HDFS_CLIENT_OK) {
+       if (retCode == HDFS_CLIENT_OK) 
+          hdfsClient->setHdfsStats(hdfsStats);
+       else {
           NADELETE(hdfsClient, HdfsClient, heap);
           hdfsClient = NULL;
        }
@@ -305,16 +385,30 @@ HdfsClient *HdfsClient::newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode)
    return hdfsClient;
 }
 
-void HdfsClient::deleteHdfsFileInfo()
+HdfsClient* HdfsClient::getInstance()
 {
-   for (int i = 0; i < numFiles_ ; i ++) {
-      NADELETEBASIC(hdfsFileInfo_[i].mName, getHeap());
-      NADELETEBASIC(hdfsFileInfo_[i].mOwner, getHeap());
-      NADELETEBASIC(hdfsFileInfo_[i].mGroup, getHeap());
+   ContextCli *currContext = GetCliGlobals()->currContext();
+   HdfsClient *hdfsClient = currContext->getHDFSClient();
+   HDFS_Client_RetCode retcode;
+   if (hdfsClient == NULL) {
+      NAHeap *heap = currContext->exHeap();
+      hdfsClient = newInstance(heap, NULL, retcode);
+      if (retcode != HDFS_CLIENT_OK)
+         return NULL; 
+      currContext->setHDFSClient(hdfsClient);
+   }
+   return hdfsClient;
+}
+
+void HdfsClient::deleteInstance()
+{
+   ContextCli *currContext = GetCliGlobals()->currContext();
+   HdfsClient *hdfsClient = currContext->getHDFSClient();
+   if (hdfsClient != NULL) {
+      NAHeap *heap = currContext->exHeap();
+      NADELETE(hdfsClient, HdfsClient, heap);
+      currContext->setHDFSClient(NULL);
    }
-   NADELETEBASIC(hdfsFileInfo_, getHeap()); 
-   numFiles_ = 0;
-   hdfsFileInfo_ = NULL;
 }
 
 HDFS_Client_RetCode HdfsClient::init()
@@ -337,11 +431,13 @@ HDFS_Client_RetCode HdfsClient::init()
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
     JavaMethods_[JM_CTOR      ].jm_signature = "()V";
     JavaMethods_[JM_HDFS_CREATE     ].jm_name      = "hdfsCreate";
-    JavaMethods_[JM_HDFS_CREATE     ].jm_signature = "(Ljava/lang/String;Z)Z";
+    JavaMethods_[JM_HDFS_CREATE     ].jm_signature = "(Ljava/lang/String;ZZ)Z";
     JavaMethods_[JM_HDFS_OPEN       ].jm_name      = "hdfsOpen";
     JavaMethods_[JM_HDFS_OPEN       ].jm_signature = "(Ljava/lang/String;Z)Z";
     JavaMethods_[JM_HDFS_WRITE      ].jm_name      = "hdfsWrite";
-    JavaMethods_[JM_HDFS_WRITE      ].jm_signature = "([BJ)Z";
+    JavaMethods_[JM_HDFS_WRITE      ].jm_signature = "([B)I";
+    JavaMethods_[JM_HDFS_READ       ].jm_name      = "hdfsRead";
+    JavaMethods_[JM_HDFS_READ       ].jm_signature = "(Ljava/nio/ByteBuffer;)I";
     JavaMethods_[JM_HDFS_CLOSE      ].jm_name      = "hdfsClose";
     JavaMethods_[JM_HDFS_CLOSE      ].jm_signature = "()Z";
     JavaMethods_[JM_HDFS_MERGE_FILES].jm_name      = "hdfsMergeFiles";
@@ -354,6 +450,12 @@ HDFS_Client_RetCode HdfsClient::init()
     JavaMethods_[JM_HDFS_DELETE_PATH].jm_signature = "(Ljava/lang/String;)Z";
     JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_name      = "hdfsListDirectory";
     JavaMethods_[JM_HDFS_LIST_DIRECTORY].jm_signature = "(Ljava/lang/String;J)I";
+    JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].jm_name      = "getHiveTableMaxModificationTs";
+    JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS ].jm_signature = "(Ljava/lang/String;I)J";
+    JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_name      = "getFsDefaultName";
+    JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_signature = "()Ljava/lang/String;";
+    JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_name      = "hdfsCreateDirectory";
+    JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_signature = "(Ljava/lang/String;)Z";
     rc = (HDFS_Client_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
     if (rc == HDFS_CLIENT_OK)
        javaMethodsInitialized_ = TRUE;
@@ -373,13 +475,22 @@ char* HdfsClient::getErrorText(HDFS_Client_RetCode errEnum)
     return (char*)hdfsClientErrorEnumStr[errEnum-HDFS_CLIENT_FIRST];
 }
 
-HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress)
+void HdfsClient::setPath(const char *path)
+{
+   if (path_ != NULL) 
+      NADELETEBASIC(path_, getHeap());
+   short len = strlen(path);
+   path_ = new (getHeap()) char[len+1];
+   strcpy(path_, path); 
+}
+
+HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean overwrite, NABoolean compress)
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsCreate(%s) called.", path);
 
   if (initJNIEnv() != JOI_OK)
      return HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM;
-
+  setPath(path);
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) {
     GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_CREATE_PARAM));
@@ -388,9 +499,17 @@ HDFS_Client_RetCode HdfsClient::hdfsCreate(const char* path, NABoolean compress)
   }
 
   jboolean j_compress = compress;
+  jboolean j_overwrite = overwrite;
+
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
 
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CREATE].jm_full_name;
-  jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CREATE].methodID, js_path, j_compress);
+  jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CREATE].methodID, js_path, j_overwrite, j_compress);
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
 
   if (jenv_->ExceptionCheck())
   {
@@ -427,9 +546,14 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress)
   }
 
   jboolean j_compress = compress;
-
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_OPEN].jm_full_name;
   jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_OPEN].methodID, js_path, j_compress);
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
 
   if (jenv_->ExceptionCheck())
   {
@@ -452,50 +576,92 @@ HDFS_Client_RetCode HdfsClient::hdfsOpen(const char* path, NABoolean compress)
 }
 
 
-HDFS_Client_RetCode HdfsClient::hdfsWrite(const char* data, Int64 len)
+Int32 HdfsClient::hdfsWrite(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode)
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len);
 
-  if (initJNIEnv() != JOI_OK)
-     return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+  if (initJNIEnv() != JOI_OK) {
+     hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+     return 0;
+  }
 
   //Write the requisite bytes into the file
   jbyteArray jbArray = jenv_->NewByteArray( len);
   if (!jbArray) {
     GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM));
     jenv_->PopLocalFrame(NULL);
-    return HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
+    hdfsClientRetcode =  HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM;
+    return 0;
   }
   jenv_->SetByteArrayRegion(jbArray, 0, len, (const jbyte*)data);
 
-  jlong j_len = len;
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
+
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_WRITE].jm_full_name;
-  jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID,jbArray , j_len);
+  // Java method returns the cumulative bytes written
+  jint totalBytesWritten = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_WRITE].methodID, jbArray);
 
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
   if (jenv_->ExceptionCheck())
   {
     getExceptionDetails();
     logError(CAT_SQL_HDFS, __FILE__, __LINE__);
     logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError());
     jenv_->PopLocalFrame(NULL);
-    return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+    hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+    return 0;
   }
 
-  if (jresult == false)
+  jenv_->PopLocalFrame(NULL);
+  hdfsClientRetcode = HDFS_CLIENT_OK;
+  Int32 bytesWritten = totalBytesWritten - totalBytesWritten_;
+  totalBytesWritten_ = totalBytesWritten;
+  return bytesWritten; 
+}
+
+Int32 HdfsClient::hdfsRead(const char* data, Int64 len, HDFS_Client_RetCode &hdfsClientRetcode)
+{
+   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsWrite(%ld) called.", len);
+
+   if (initJNIEnv() != JOI_OK) {
+      hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION;
+      return 0;
+   }
+   jobject j_buf = jenv_->NewDirectByteBuffer((BYTE *)data, len);
+   if (j_buf == NULL) {
+      GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_READ_PARAM));
+      jenv_->PopLocalFrame(NULL);
+      return HDFS_CLIENT_ERROR_HDFS_READ_PARAM;
+   }
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
+
+  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_READ].jm_full_name;
+  jint bytesRead = 0;
+  bytesRead = jenv_->CallIntMethod(javaObj_, JavaMethods_[JM_HDFS_READ].methodID, j_buf);
+
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
+  if (jenv_->ExceptionCheck())
   {
-    logError(CAT_SQL_HDFS, "HdfsClient::hdfsWrite()", getLastError());
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS, __FILE__, __LINE__);
+    logError(CAT_SQL_HDFS, "HdfsClient::hdfsRead()", getLastError());
     jenv_->PopLocalFrame(NULL);
-    return HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION;
+    hdfsClientRetcode = HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION;
+    return 0;
   }
-
-
   jenv_->PopLocalFrame(NULL);
-  return HDFS_CLIENT_OK;
+  hdfsClientRetcode = HDFS_CLIENT_OK;
+  return bytesRead; 
 }
 
-//////////////////////////////////////////////////////////////////////////////
-//
-//////////////////////////////////////////////////////////////////////////////
 HDFS_Client_RetCode HdfsClient::hdfsClose()
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::close() called.");
@@ -504,9 +670,15 @@ HDFS_Client_RetCode HdfsClient::hdfsClose()
      return HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION;
 
   // String close();
+  if (hdfsStats_ != NULL)
+     hdfsStats_->getHdfsTimer().start();
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name;
   jboolean jresult = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_HDFS_CLOSE].methodID);
 
+  if (hdfsStats_ != NULL) {
+      hdfsStats_->incMaxHdfsIOTime(hdfsStats_->getHdfsTimer().stop());
+      hdfsStats_->incHdfsCalls();
+  }
   if (jenv_->ExceptionCheck())
   {
     getExceptionDetails();
@@ -533,6 +705,8 @@ HDFS_Client_RetCode HdfsClient::hdfsCleanUnloadPath( const NAString& uldPath)
                                                                              uldPath.data());
   if (initJNIEnv() != JOI_OK)
      return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM;
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {
     GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HDFS_CLEANUP_PARAM));
@@ -563,8 +737,9 @@ HDFS_Client_RetCode HdfsClient::hdfsMergeFiles( const NAString& srcPath,
                   srcPath.data(), dstPath.data());
 
   if (initJNIEnv() != JOI_OK)
-     return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION;
-
+     return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM;
   jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data());
 
   if (js_SrcPath == NULL) {
@@ -608,7 +783,9 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath)
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsDeletePath(%s called.",
                   delPath.data());
   if (initJNIEnv() != JOI_OK)
-     return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
+     return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM;
 
   jstring js_delPath = jenv_->NewStringUTF(delPath.data());
   if (js_delPath == NULL) {
@@ -617,7 +794,6 @@ HDFS_Client_RetCode HdfsClient::hdfsDeletePath( const NAString& delPath)
      return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_PARAM;
   }
 
-
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_DELETE_PATH].jm_full_name;
   jboolean jresult = jenv_->CallStaticBooleanMethod(javaClass_, JavaMethods_[JM_HDFS_DELETE_PATH].methodID, js_delPath);
 
@@ -676,9 +852,10 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean &
 {
   QRLogger::log(CAT_SQL_HDFS, LL_DEBUG, "HdfsClient::hdfsExists(%s) called.",
                                                       uldPath.data());
-
   if (initJNIEnv() != JOI_OK)
-     return HDFS_CLIENT_ERROR_HDFS_EXISTS_EXCEPTION;
+     return HDFS_CLIENT_ERROR_HDFS_EXISTS_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_HDFS_EXISTS_PARAM;
 
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {
@@ -701,6 +878,113 @@ HDFS_Client_RetCode HdfsClient::hdfsExists( const NAString& uldPath, NABoolean &
   return HDFS_CLIENT_OK;
 }
 
+HDFS_Client_RetCode HdfsClient::getHiveTableMaxModificationTs( Int64& maxModificationTs, const char * tableDirPaths,  int levelDeep)
+{
+  QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::getHiveTableMaxModificationTs(%s) called.",tableDirPaths);
+  if (initJNIEnv() != JOI_OK)
+     return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM; 
+  jstring js_tableDirPaths = jenv_->NewStringUTF(tableDirPaths);
+  if (js_tableDirPaths == NULL)
+  {
+    GetCliGlobals()->setJniErrorStr(getErrorText(HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM));
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM;
+  }
+
+  jint jlevelDeep = levelDeep;
+  tsRecentJMFromJNI = JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].jm_full_name;
+  jlong jresult = jenv_->CallStaticLongMethod(javaClass_,
+                                          JavaMethods_[JM_HIVE_TBL_MAX_MODIFICATION_TS].methodID,
+										  js_tableDirPaths, jlevelDeep);
+  jenv_->DeleteLocalRef(js_tableDirPaths);
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(jenv_);
+    logError(CAT_SQL_HBASE, __FILE__, __LINE__);
+    logError(CAT_SQL_HBASE, "HDFSClientI::getHiveTableMaxModificationTs()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_EXCEPTION;
+  }
+  QRLogger::log(CAT_SQL_HBASE, LL_DEBUG,
+       "Exit HDFSClient_JNI::getHiveTableMaxModificationTs() called.");
+  maxModificationTs = jresult;
+  jenv_->PopLocalFrame(NULL);
+
+  return HDFS_CLIENT_OK;
+}
+
+HDFS_Client_RetCode HdfsClient::getFsDefaultName(char* buf, int buf_len)
+{
+  QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::getFsDefaultName() called.");
+  if (initJNIEnv() != JOI_OK)
+     return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_PARAM;
+
+  tsRecentJMFromJNI = JavaMethods_[JM_GET_FS_DEFAULT_NAME].jm_full_name;
+  jstring jresult = 
+        (jstring)jenv_->CallStaticObjectMethod(javaClass_,
+                              JavaMethods_[JM_GET_FS_DEFAULT_NAME].methodID);
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(jenv_);
+    logError(CAT_SQL_HBASE, __FILE__, __LINE__);
+    logError(CAT_SQL_HBASE, "HDFSClient_JNI::getFsDefaultName()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_EXCEPTION;
+  }
+  const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
+
+  HDFS_Client_RetCode retcode = HDFS_CLIENT_OK;
+  if ( buf_len >= strlen(char_result) ) {
+     strcpy(buf, char_result);
+  } else
+     retcode = HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_BUFFER_TOO_SMALL;
+
+  jenv_->ReleaseStringUTFChars(jresult, char_result);
+  jenv_->PopLocalFrame(NULL);
+
+  return retcode;
+}
+
+HDFS_Client_RetCode HdfsClient::hdfsCreateDirectory(const NAString &dirName)
+{
+  QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "Enter HDFSClient_JNI::createDirectory() called.");
+  if (initJNIEnv() != JOI_OK)
+     return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM;
+  if (getInstance() == NULL)
+     return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM;
+
+  jstring js_dirName = jenv_->NewStringUTF(dirName.data());
+  if (js_dirName == NULL) {
+     jenv_->PopLocalFrame(NULL);
+     return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM;
+  }
+
+  tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CREATE_DIRECTORY].jm_full_name;
+  jstring jresult = 
+        (jstring)jenv_->CallStaticObjectMethod(javaClass_,
+                              JavaMethods_[JM_HDFS_CREATE_DIRECTORY].methodID, js_dirName);
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails(jenv_);
+    logError(CAT_SQL_HBASE, __FILE__, __LINE__);
+    logError(CAT_SQL_HBASE, "HDFSClient_JNI::hdfsCreateDirectory()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION;
+  }
+  if (jresult == false)
+  {
+    logError(CAT_SQL_HDFS, "HdfsClient::hdfsCreateDirectory()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return HDFS_CLIENT_ERROR_HDFS_DELETE_PATH_EXCEPTION;
+  }
+  jenv_->PopLocalFrame(NULL);
+  return HDFS_CLIENT_OK;
+}
+
 HDFS_Client_RetCode HdfsClient::setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, 
           jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, 
           jstring owner, jstring group, jshort permissions, jlong accessTime)

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/HdfsClient_JNI.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/HdfsClient_JNI.h b/core/sql/executor/HdfsClient_JNI.h
index c45d226..6f68f4d 100644
--- a/core/sql/executor/HdfsClient_JNI.h
+++ b/core/sql/executor/HdfsClient_JNI.h
@@ -39,6 +39,8 @@ typedef enum {
   ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_PARAM
   ,HDFS_SCAN_ERROR_TRAF_HDFS_READ_EXCEPTION
   ,HDFS_SCAN_EOR
+  ,HDFS_SCAN_ERROR_STOP_PARAM
+  ,HDFS_SCAN_ERROR_STOP_EXCEPTION
   ,HDFS_SCAN_LAST
 } HDFS_Scan_RetCode;
 
@@ -48,11 +50,18 @@ public:
   // Default constructor - for creating a new JVM		
   HdfsScan(NAHeap *heap)
   :  JavaObjectInterface(heap) 
+  , hdfsStats_(NULL)
+  , j_buf1_(NULL)
+  , j_buf2_(NULL)
   {}
 
+  ~HdfsScan();
+
   // Initialize JVM and all the JNI configuration.
   // Must be called.
   HDFS_Scan_RetCode init();
+  void setHdfsStats(ExHdfsScanStats *hdfsStats)
+  { hdfsStats_ = hdfsStats; } 
 
   // Get the error description.
   static char* getErrorText(HDFS_Scan_RetCode errEnum);
@@ -61,19 +70,25 @@ public:
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, int rangeTailIOSize,
             ExHdfsScanStats *hdfsStats, HDFS_Scan_RetCode &hdfsScanRetCode);
 
-  HDFS_Scan_RetCode setScanRanges(NAHeap *heap, ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, 
+  HDFS_Scan_RetCode setScanRanges(ExHdfsScanTcb::HDFS_SCAN_BUF *hdfsScanBuf, int scanBufSize, 
             HdfsFileInfoArray *hdfsFileInfoArray, Int32 beginRangeNum, Int32 numRanges, 
-            int rangeTailIOSize, ExHdfsScanStats *hdfsStats);
+            int rangeTailIOSize);
+
+  HDFS_Scan_RetCode trafHdfsRead(int retArray[], short arrayLen);
 
-  HDFS_Scan_RetCode trafHdfsRead(NAHeap *heap, ExHdfsScanStats *hdfsStats, int retArray[], short arrayLen);
+  HDFS_Scan_RetCode stop();
 
 private:
   enum JAVA_METHODS {
     JM_CTOR = 0, 
     JM_SET_SCAN_RANGES,
     JM_TRAF_HDFS_READ,
+    JM_STOP,
     JM_LAST
   };
+  jobject j_buf1_;
+  jobject j_buf2_;
+  ExHdfsScanStats *hdfsStats_;
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;
   static bool javaMethodsInitialized_;
@@ -114,6 +129,8 @@ typedef enum {
  ,HDFS_CLIENT_ERROR_HDFS_OPEN_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_WRITE_PARAM
  ,HDFS_CLIENT_ERROR_HDFS_WRITE_EXCEPTION
+ ,HDFS_CLIENT_ERROR_HDFS_READ_PARAM
+ ,HDFS_CLIENT_ERROR_HDFS_READ_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_CLOSE_EXCEPTION
  ,HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_PARAM
  ,HDFS_CLIENT_ERROR_HDFS_MERGE_FILES_EXCEPTION
@@ -127,6 +144,13 @@ typedef enum {
  ,HDFS_CLIENT_ERROR_SET_HDFSFILEINFO
  ,HDFS_CLIENT_ERROR_HDFS_LIST_DIR_PARAM
  ,HDFS_CLIENT_ERROR_HDFS_LIST_DIR_EXCEPTION
+ ,HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_PARAM
+ ,HDFS_CLIENT_ERROR_HIVE_TBL_MAX_MODIFICATION_TS_EXCEPTION
+ ,HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_PARAM
+ ,HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_EXCEPTION
+ ,HDFS_CLIENT_ERROR_GET_FS_DEFAULT_NAME_BUFFER_TOO_SMALL
+ ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_PARAM
+ ,HDFS_CLIENT_ERROR_CREATE_DIRECTORY_EXCEPTION
  ,HDFS_CLIENT_LAST
 } HDFS_Client_RetCode;
 
@@ -136,34 +160,43 @@ public:
   // Default constructor - for creating a new JVM		
   HdfsClient(NAHeap *heap)
   :  JavaObjectInterface(heap) 
+    , path_(NULL)
     , hdfsFileInfo_(NULL) 
     , numFiles_(0)
+    , totalBytesWritten_(0)
+    , hdfsStats_(NULL)
   {
   }
  
   ~HdfsClient();
-  static HdfsClient *newInstance(NAHeap *heap, HDFS_Client_RetCode &retCode);
+  static HdfsClient *newInstance(NAHeap *heap, ExHdfsScanStats *hdfsStats, HDFS_Client_RetCode &retCode);
+  static HdfsClient *getInstance();
+  static void deleteInstance();
 
   // Get the error description.
   static char* getErrorText(HDFS_Client_RetCode errEnum);
-  
-  // Initialize JVM and all the JNI configuration.
-  // Must be called.
+  void setHdfsStats(ExHdfsScanStats *hdfsStats)
+  { hdfsStats_ = hdfsStats; } 
   HDFS_Client_RetCode    init();
-  HDFS_Client_RetCode    hdfsCreate(const char* path, NABoolean compress);
+  HDFS_Client_RetCode    hdfsCreate(const char* path, NABoolean overwrite, NABoolean compress);
   HDFS_Client_RetCode    hdfsOpen(const char* path, NABoolean compress);
-  HDFS_Client_RetCode    hdfsWrite(const char* data, Int64 size);
+  Int32                  hdfsWrite(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode);
+  Int32                  hdfsRead(const char* data, Int64 size, HDFS_Client_RetCode &hdfsClientRetcode);
   HDFS_Client_RetCode    hdfsClose();
-  HDFS_Client_RetCode    hdfsMergeFiles(const NAString& srcPath,
-                                 const NAString& dstPath);
-  HDFS_Client_RetCode    hdfsCleanUnloadPath(const NAString& uldPath );
-  HDFS_Client_RetCode    hdfsExists(const NAString& uldPath,  NABoolean & exists );
-  HDFS_Client_RetCode    hdfsDeletePath(const NAString& delPath);
   HDFS_Client_RetCode    setHdfsFileInfo(JNIEnv *jenv, jint numFiles, jint fileNo, jboolean isDir, 
           jstring filename, jlong modTime, jlong len, jshort numReplicas, jlong blockSize, 
           jstring owner, jstring group, jshort permissions, jlong accessTime);
   HDFS_Client_RetCode    hdfsListDirectory(const char *pathStr, HDFS_FileInfo **hdfsFileInfo, int *numFiles);
-  void	                 deleteHdfsFileInfo();
+  static HDFS_Client_RetCode    hdfsMergeFiles(const NAString& srcPath, const NAString& dstPath);
+  static HDFS_Client_RetCode    hdfsCleanUnloadPath(const NAString& uldPath );
+  static HDFS_Client_RetCode    hdfsExists(const NAString& uldPath,  NABoolean & exists );
+  static HDFS_Client_RetCode    hdfsDeletePath(const NAString& delPath);
+  static HDFS_Client_RetCode    getHiveTableMaxModificationTs(Int64& maxModificationTs, const char * tableDirPaths,  int levelDeep);
+   // Get the hdfs URL.
+  // buffer is the buffer pre-allocated to hold the result
+  // buf_len is the length of the buffer in bytes
+  static HDFS_Client_RetCode    getFsDefaultName(char* buffer, Int32 buf_len);
+  static HDFS_Client_RetCode    hdfsCreateDirectory(const NAString& path);
 
 private:  
   enum JAVA_METHODS {
@@ -171,16 +204,27 @@ private:
     JM_HDFS_CREATE,
     JM_HDFS_OPEN,
     JM_HDFS_WRITE,
+    JM_HDFS_READ,
     JM_HDFS_CLOSE,
     JM_HDFS_MERGE_FILES,
     JM_HDFS_CLEAN_UNLOAD_PATH,
     JM_HDFS_EXISTS,
     JM_HDFS_DELETE_PATH,
     JM_HDFS_LIST_DIRECTORY,
+    JM_HIVE_TBL_MAX_MODIFICATION_TS,
+    JM_GET_FS_DEFAULT_NAME,
+    JM_HDFS_CREATE_DIRECTORY,
     JM_LAST
   };
+
+  void deleteHdfsFileInfo();
+  void setPath(const char *path);
+
   HDFS_FileInfo *hdfsFileInfo_; 
   int numFiles_;
+  char *path_;
+  Int64 totalBytesWritten_;
+  ExHdfsScanStats *hdfsStats_;
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;
   static bool javaMethodsInitialized_;

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/JavaObjectInterface.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/JavaObjectInterface.cpp b/core/sql/executor/JavaObjectInterface.cpp
index 6919866..2260715 100644
--- a/core/sql/executor/JavaObjectInterface.cpp
+++ b/core/sql/executor/JavaObjectInterface.cpp
@@ -24,6 +24,7 @@
 #include "JavaObjectInterface.h"
 #include "QRLogger.h"
 #include "Globals.h"
+#include "Context.h"
 #include "ComUser.h"
 #include "LmJavaOptions.h"
 #include "ex_ex.h"
@@ -546,7 +547,7 @@ NABoolean  JavaObjectInterface::getExceptionDetails(JNIEnv *jenv)
    if (jenv == NULL)
        jenv = jenv_;
    CliGlobals *cliGlobals = GetCliGlobals();
-   NAString error_msg(heap_);
+   NAString error_msg(cliGlobals->currContext()->exHeap());
    if (jenv == NULL)
    {
       error_msg = "Internal Error - Unable to obtain jenv";

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/JavaObjectInterface.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/JavaObjectInterface.h b/core/sql/executor/JavaObjectInterface.h
index b167420..d07b6f0 100644
--- a/core/sql/executor/JavaObjectInterface.h
+++ b/core/sql/executor/JavaObjectInterface.h
@@ -119,12 +119,12 @@ protected:
   // Get the error description.
   static char* getErrorText(JOI_RetCode errEnum);
  
-  NAString getLastError();
+  static NAString getLastError();
 
   // Write the description of a Java error to the log file.
-  void logError(std::string &cat, const char* methodName, const char *result);
-  void logError(std::string &cat, const char* methodName, jstring jresult);
-  void logError(std::string &cat, const char* file, int line);
+  static void logError(std::string &cat, const char* methodName, const char *result);
+  static void logError(std::string &cat, const char* methodName, jstring jresult);
+  static void logError(std::string &cat, const char* file, int line);
 
   static JOI_RetCode initJNIEnv();
   static char* buildClassPath();  
@@ -145,9 +145,9 @@ public:
   }
   // Pass in jenv if the thread where the object is created is different than
   // the thread where exception occurred
-  NABoolean getExceptionDetails(JNIEnv *jenv = NULL);  
+  static NABoolean getExceptionDetails(JNIEnv *jenv = NULL);  
 
-  void appendExceptionMessages(JNIEnv *jenv, jthrowable a_exception, NAString &error_msg);
+  static void appendExceptionMessages(JNIEnv *jenv, jthrowable a_exception, NAString &error_msg);
   
   NAHeap *getHeap() { return heap_; }
 protected:

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/OrcFileReader.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.h b/core/sql/executor/OrcFileReader.h
index 925456b..536235a 100644
--- a/core/sql/executor/OrcFileReader.h
+++ b/core/sql/executor/OrcFileReader.h
@@ -90,7 +90,7 @@ public:
   
   OFR_RetCode				getRowCount(Int64& count);
 
-  virtual char*  getErrorText(OFR_RetCode errEnum);
+  static char*  getErrorText(OFR_RetCode errEnum);
 
 protected:
   jstring getLastError();

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/executor/SequenceFileReader.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.h b/core/sql/executor/SequenceFileReader.h
index 12a68c2..a679eba 100644
--- a/core/sql/executor/SequenceFileReader.h
+++ b/core/sql/executor/SequenceFileReader.h
@@ -99,7 +99,7 @@ public:
 
   SFR_RetCode    fetchRowsIntoBuffer(Int64 stopOffset, char* buffer, Int64 buffSize, Int64& bytesRead, char rowDelimiter);
 
-  virtual char*  getErrorText(SFR_RetCode errEnum);
+  static char*  getErrorText(SFR_RetCode errEnum);
 
 protected:
   jstring getLastError();
@@ -187,7 +187,7 @@ public:
   SFW_RetCode    close();
   SFW_RetCode    release();
 
-  virtual char*  getErrorText(SFW_RetCode errEnum);
+  static char*  getErrorText(SFW_RetCode errEnum);
 
 
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/exp/ExpLOBaccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.cpp b/core/sql/exp/ExpLOBaccess.cpp
index 481e960..4b3a785 100644
--- a/core/sql/exp/ExpLOBaccess.cpp
+++ b/core/sql/exp/ExpLOBaccess.cpp
@@ -367,143 +367,58 @@ Ex_Lob_Error ExLob::writeDataSimple(char *data, Int64 size, LobsSubOper subOpera
     return LOB_OPER_OK;
 }
 
-Ex_Lob_Error ExLob::dataModCheck2(
+// numOfPartLevels: 0, if not partitioned
+//                  N, number of partitioning cols
+// failedModTS: timestamp value that caused the mismatch
+Ex_Lob_Error ExLob::dataModCheck(
        char * dirPath, 
        Int64  inputModTS,
        Lng32  numOfPartLevels,
+       ExLobGlobals *lobGlobals,
        Int64 &failedModTS,
        char  *failedLocBuf,
        Int32 *failedLocBufLen)
 {
-  if (numOfPartLevels == 0)
+  if (inputModTS <= 0)
     return LOB_OPER_OK;
 
-  Lng32 currNumFilesInDir = 0;
-  hdfsFileInfo * fileInfos = 
-    hdfsListDirectory(fs_, dirPath, &currNumFilesInDir);
-  if ((currNumFilesInDir > 0) && (fileInfos == NULL))
-    {
-      return LOB_DATA_FILE_NOT_FOUND_ERROR;
-    }
-
-  NABoolean failed = FALSE;
-  for (Lng32 i = 0; ((NOT failed) && (i < currNumFilesInDir)); i++)
-    {
-      hdfsFileInfo &fileInfo = fileInfos[i];
-      if (fileInfo.mKind == kObjectKindDirectory)
-        {
-          Int64 currModTS = fileInfo.mLastMod;
-          if ((inputModTS > 0) &&
-              (currModTS > inputModTS) &&
-	      (!strstr(fileInfo.mName, ".hive-staging_hive_")))
-            {
-              failed = TRUE;
-              failedModTS = currModTS;
+  Ex_Lob_Error result = LOB_OPER_OK;
+  HDFS_Client_RetCode rc;
+  Int64 currModTS;
 
-              if (failedLocBuf && failedLocBufLen)
-                {
-                  Lng32 failedFileLen = strlen(fileInfo.mName);
-                  Lng32 copyLen = (failedFileLen > (*failedLocBufLen-1) 
-                                   ? (*failedLocBufLen-1) : failedFileLen);
-                  
-                  str_cpy_and_null(failedLocBuf, fileInfo.mName, copyLen,
-                                   '\0', ' ', TRUE);
-                  *failedLocBufLen = copyLen;
-                }
-            }
-        }
-    }
 
-  hdfsFreeFileInfo(fileInfos, currNumFilesInDir);
-  if (failed)
-    return LOB_DATA_MOD_CHECK_ERROR;
+  failedModTS = -1;
 
-  numOfPartLevels--;
-  Ex_Lob_Error err = LOB_OPER_OK;
-  if (numOfPartLevels > 0)
+  // libhdfs returns a second-resolution timestamp,
+  // get a millisecond-resolution timestamp via JNI
+  rc = HdfsClient::getHiveTableMaxModificationTs(currModTS,
+                                                dirPath,
+                                                numOfPartLevels);
+  // check for errors and timestamp mismatches
+  if (rc != HDFS_CLIENT_OK || currModTS <= 0)
     {
-      for (Lng32 i = 0; ((NOT failed) && (i < currNumFilesInDir)); i++)
-        {
-          hdfsFileInfo &fileInfo = fileInfos[i];
-          err = dataModCheck2(fileInfo.mName, inputModTS, numOfPartLevels,
-                              failedModTS, failedLocBuf, failedLocBufLen);
-          if (err != LOB_OPER_OK)
-            return err;
-        }
+      result = LOB_DATA_READ_ERROR;
+    }
+  else if (currModTS > inputModTS)
+    {
+      result = LOB_DATA_MOD_CHECK_ERROR;
+      failedModTS = currModTS;
     }
 
-  return LOB_OPER_OK;
-}
-
-// numOfPartLevels: 0, if not partitioned
-//                  N, number of partitioning cols
-// failedModTS: timestamp value that caused the mismatch
-Ex_Lob_Error ExLob::dataModCheck(
-       char * dirPath, 
-       Int64  inputModTS,
-       Lng32  numOfPartLevels,
-       ExLobGlobals *lobGlobals,
-       Int64 &failedModTS,
-       char  *failedLocBuf,
-       Int32 *failedLocBufLen)
-{
-  failedModTS = -1;
-
-  // find mod time of root dir
-  hdfsFileInfo *fileInfos = hdfsGetPathInfo(fs_, dirPath);
-  if (fileInfos == NULL)
-    {       
+  if (result != LOB_OPER_OK && failedLocBuf && failedLocBufLen)
+    {
+      // sorry, we lost the exact location for partitioned
+      // files, user needs to search for him/herself
       Lng32 failedFileLen = strlen(dirPath);
       Lng32 copyLen = (failedFileLen > (*failedLocBufLen-1) 
                        ? (*failedLocBufLen-1) : failedFileLen);
-      Int32 hdfserror = errno;
-      char hdfsErrStr[20];
-      snprintf(hdfsErrStr,sizeof(hdfsErrStr),"(errno %d)",errno);
+
       str_cpy_and_null(failedLocBuf, dirPath, copyLen,
                        '\0', ' ', TRUE);
-      str_cat_c(failedLocBuf,hdfsErrStr);
       *failedLocBufLen = copyLen;
-      if (errno)
-        {
-          // Allow for hdfs error. AQR will find the new hive mapped files
-          // if the hive table has been remapped to new data files
-          return LOB_DATA_MOD_CHECK_ERROR;
-        }
-      else
-        return LOB_DATA_READ_ERROR;
     }
-    
-  Int64 currModTS = fileInfos[0].mLastMod;
-  if ((inputModTS > 0) &&
-      (currModTS > inputModTS))
-    {
-      hdfsFileInfo &fileInfo = fileInfos[0];
 
-      failedModTS = currModTS;
-
-      if (failedLocBuf && failedLocBufLen)
-        {
-          Lng32 failedFileLen = strlen(fileInfo.mName);
-          Lng32 copyLen = (failedFileLen > (*failedLocBufLen-1) 
-                           ? (*failedLocBufLen-1) : failedFileLen);
-          
-          str_cpy_and_null(failedLocBuf, fileInfo.mName, copyLen,
-                           '\0', ' ', TRUE);
-          *failedLocBufLen = copyLen;
-        }
-
-      hdfsFreeFileInfo(fileInfos, 1);
-      return LOB_DATA_MOD_CHECK_ERROR;
-    }
-
-  hdfsFreeFileInfo(fileInfos, 1);
-  if (numOfPartLevels > 0)
-    {
-      return dataModCheck2(dirPath, inputModTS, numOfPartLevels, 
-                           failedModTS, failedLocBuf, failedLocBufLen);
-    }
-
-  return LOB_OPER_OK;
+  return result;
 }
 
 Ex_Lob_Error ExLob::emptyDirectory(char *dirPath,

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/exp/ExpLOBaccess.h
----------------------------------------------------------------------
diff --git a/core/sql/exp/ExpLOBaccess.h b/core/sql/exp/ExpLOBaccess.h
index 47f06df..3c5e93e 100644
--- a/core/sql/exp/ExpLOBaccess.h
+++ b/core/sql/exp/ExpLOBaccess.h
@@ -484,14 +484,6 @@ class ExLob : public NABasicObject
        char  *failedLocBuf,
        Int32 *failedLocBufLen);
 
-  Ex_Lob_Error dataModCheck2(
-       char * dirPath, 
-       Int64  modTS,
-       Lng32  numOfPartLevels,
-       Int64 &failedModTS,
-       char  *failedLocBuf,
-       Int32 *failedLocBufLen);
-
   Ex_Lob_Error emptyDirectory(char* dirPath, ExLobGlobals* lobGlobals);
 
   ExLobStats *getStats() { return &stats_; }

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/generator/GenFastTransport.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenFastTransport.cpp b/core/sql/generator/GenFastTransport.cpp
index bc788d9..75c1e0e 100644
--- a/core/sql/generator/GenFastTransport.cpp
+++ b/core/sql/generator/GenFastTransport.cpp
@@ -646,7 +646,7 @@ PhysicalFastExtract::codeGen(Generator *generator)
                  (char*)getHiveTableName().data(),
                  TRUE, // isHive
                  (char*)getTargetName().data(), // root dir
-                 hTabStats->getModificationTS(),
+                 hTabStats->getModificationTSmsec(),
                  0,
                  NULL,
                  (char*)getHdfsHostName().data(), 
@@ -657,7 +657,7 @@ PhysicalFastExtract::codeGen(Generator *generator)
       else
         {
           // sim check at leaf
-          modTS = hTabStats->getModificationTS();
+          modTS = hTabStats->getModificationTSmsec();
         }
     } // do sim check
 

http://git-wip-us.apache.org/repos/asf/trafodion/blob/e303b3a0/core/sql/generator/GenRelScan.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelScan.cpp b/core/sql/generator/GenRelScan.cpp
index 3d01223..df4b303 100644
--- a/core/sql/generator/GenRelScan.cpp
+++ b/core/sql/generator/GenRelScan.cpp
@@ -393,7 +393,8 @@ short FileScan::genForTextAndSeq(Generator * generator,
                                 char* &hdfsHostName,
                                 Int32 &hdfsPort,
                                 NABoolean &useCursorMulti,
-                                NABoolean &doSplitFileOpt)
+                                NABoolean &doSplitFileOpt,
+                                NABoolean &isCompressedFile)
 {
   Space * space          = generator->getSpace();
 
@@ -480,6 +481,9 @@ short FileScan::genForTextAndSeq(Generator * generator,
 	      hfi.bytesToRead_ = span;
 	      hfi.fileName_ = fnameInList;
 	      
+              isCompressedFile = FALSE;
+                  //if (file->getCompressionInfo().getCompressionMethod() != ComCompressionInfo::UNCOMPRESSED)
+                  //   isCompressedFile = TRUE;
 	      char * hfiInList = space->allocateAndCopyToAlignedSpace
 		((char*)&hfi, sizeof(HdfsFileInfo));
 	      
@@ -1144,13 +1148,14 @@ short FileScan::codeGenForHive(Generator * generator)
   }
   NABoolean useCursorMulti = FALSE;
   NABoolean doSplitFileOpt = FALSE;
+  NABoolean isCompressedFile = FALSE;
 
   if ((hTabStats->isTextFile()) || (hTabStats->isSequenceFile()))
     {
       genForTextAndSeq(generator, 
                        hdfsFileInfoList, hdfsFileRangeBeginList, hdfsFileRangeNumList,
                        hdfsHostName, hdfsPort,
-                       useCursorMulti, doSplitFileOpt);
+                       useCursorMulti, doSplitFileOpt, isCompressedFile);
     }
   else if (hTabStats->isOrcFile())
     {
@@ -1277,7 +1282,7 @@ if (hTabStats->isOrcFile())
       (hTabStats->numOfPartCols() <= 0) &&
       (!getCommonSubExpr()))
     {
-      modTS = hTabStats->getModificationTS();
+      modTS = hTabStats->getModificationTSmsec();
       numOfPartLevels = hTabStats->numOfPartCols();
 
       // if specific directories are to checked based on the query struct
@@ -1296,7 +1301,7 @@ if (hTabStats->isOrcFile())
                  tiName,
                  TRUE, // isHive
                  (char*)hTabStats->tableDir().data(), // root dir
-                 hTabStats->getModificationTS(),
+                 modTS,
                  numOfPartLevels,
                  hdfsDirsToCheck,
                  hdfsHostName, hdfsPort);
@@ -1310,8 +1315,6 @@ if (hTabStats->isOrcFile())
             space->allocateAndCopyToAlignedSpace(hTabStats->tableDir().data(),
                                                  hTabStats->tableDir().length(),
                                                  0);
-          modTS = hTabStats->getModificationTS();
-          numOfPartLevels = hTabStats->numOfPartCols();
         }
     }
 
@@ -1394,6 +1397,8 @@ if (hTabStats->isOrcFile())
   if (CmpCommon::getDefault(USE_LIBHDFS_SCAN) == DF_ON)
      hdfsscan_tdb->setUseLibhdfsScan(TRUE);
 
+  hdfsscan_tdb->setCompressedFile(isCompressedFile);
+
   if(!generator->explainDisabled()) {
     generator->setExplainTuple(
        addExplainInfo(hdfsscan_tdb, 0, 0, generator));