You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2017/11/29 05:29:01 UTC

[1/3] incubator-trafodion git commit: [TRAFODION-2821] Trafodion core code base needs to be thread safe

Repository: incubator-trafodion
Updated Branches:
  refs/heads/master 6d2213d04 -> 1efa9dbdd


[TRAFODION-2821] Trafodion core code base needs to be thread safe

The method Ids of the different Java classes are now initialized
in a thread safe manner in JNI layer.

The missing code to initialize the thread specific JNIEnv are added.


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/31041e0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/31041e0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/31041e0f

Branch: refs/heads/master
Commit: 31041e0f7630cea22d866e53fe6e8c7a4d046974
Parents: 19c7544
Author: selvaganesang <se...@esgyn.com>
Authored: Tue Nov 28 13:45:41 2017 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Tue Nov 28 13:45:41 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/OrcFileReader.cpp      | 222 +++++++++++++++++++-------
 core/sql/executor/OrcFileReader.h        |   4 +
 core/sql/executor/SequenceFileReader.cpp | 113 +++++++------
 3 files changed, 235 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp
index 7696589..d63052d 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -30,6 +30,8 @@
 
 JavaMethodInit* OrcFileReader::JavaMethods_ = NULL;
 jclass OrcFileReader::javaClass_ = 0;
+bool OrcFileReader::javaMethodsInitialized_ = false;
+pthread_mutex_t OrcFileReader::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
 
 static const char* const sfrErrorEnumStr[] = 
 {
@@ -43,8 +45,10 @@ static const char* const sfrErrorEnumStr[] =
  ,"Java exception in isEOF()"
  ,"Java exception in fetchNextRow()"
  ,"Java exception in close()"
+ ,"Java exception in getRowNum()"
 };
 
+
 //////////////////////////////////////////////////////////////////////////////
 // 
 //////////////////////////////////////////////////////////////////////////////
@@ -70,11 +74,29 @@ OrcFileReader::~OrcFileReader()
 OFR_RetCode OrcFileReader::init()
 {
   static char className[]="org/trafodion/sql/OrcFileReader";
-  
-  if (JavaMethods_)
-    return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, TRUE);       
-  else
+
+  OFR_RetCode lv_retcode = OFR_OK;
+ 
+  QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+		LL_DEBUG,
+		"Enter OrcFileReader::init()");
+
+  if (isInitialized())
+    return lv_retcode;
+
+  if (javaMethodsInitialized_)
+    return (OFR_RetCode)JavaObjectInterface::init(className, 
+							javaClass_, 
+							JavaMethods_, 
+                                                        (Int32)JM_LAST, javaMethodsInitialized_);
+  else 
   {
+    pthread_mutex_lock(&javaMethodsInitMutex_);
+    if (javaMethodsInitialized_)
+    {
+      pthread_mutex_unlock(&javaMethodsInitMutex_);
+      return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
+    }
     JavaMethods_ = new JavaMethodInit[JM_LAST];
     
     JavaMethods_[JM_CTOR      ].jm_name      = "<init>";
@@ -111,9 +133,15 @@ OFR_RetCode OrcFileReader::init()
 //    JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;";
     JavaMethods_[JM_CLOSE     ].jm_name      = "close";
     JavaMethods_[JM_CLOSE     ].jm_signature = "()Ljava/lang/String;";
-   
-    return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, FALSE);
+    
+    lv_retcode = (OFR_RetCode)JavaObjectInterface::init(className,
+							javaClass_,
+							JavaMethods_,
+							(Int32)JM_LAST, javaMethodsInitialized_);
+    javaMethodsInitialized_ = TRUE;
+    pthread_mutex_unlock(&javaMethodsInitMutex_);
   }
+  return lv_retcode;
 }
 	
 //////////////////////////////////////////////////////////////////////////////
@@ -122,24 +150,28 @@ OFR_RetCode OrcFileReader::init()
 OFR_RetCode OrcFileReader::open(const char* path)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::open(%s) called.", path);
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) 
+  {
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_OPEN_PARAM;
-
+  }
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
 
-  jenv_->DeleteLocalRef(js_path);  
-
-  if (jresult != NULL)
+  if (jenv_->ExceptionCheck())
   {
-  	const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
-  	printf("open error: %s\n", my_string);
+    getExceptionDetails();
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::open()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_OPEN_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -149,18 +181,32 @@ OFR_RetCode OrcFileReader::open(const char* path)
 OFR_RetCode OrcFileReader::getPosition(Int64& pos)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getPosition(%ld) called.", pos);
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETPOS_EXCEPTION;
 
   // long getPosition();
   tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
   Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
 
-  if (result == -1) 
+  if (jenv_->ExceptionCheck())
   {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::getPosition()", getLastError());
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_GETPOS_EXCEPTION;
+  }
+
+  if (result == -1) {
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+	     "OrcFileReader::getPosition()",
+	     getLastError());
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_GETPOS_EXCEPTION;
   }
 
   pos = result;
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -169,23 +215,41 @@ OFR_RetCode OrcFileReader::getPosition(Int64& pos)
 //////////////////////////////////////////////////////////////////////////////
 OFR_RetCode OrcFileReader::seeknSync(Int64 pos)
 {
-	Int64 orcPos;
+  Int64 orcPos;
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_SYNC_EXCEPTION;
 	
-	orcPos = pos -1;	//When you position in ORC, reading the NEXT row will be one greater than what you wanted.
+  orcPos = pos -1;	//When you position in ORC, reading the NEXT row will be one greater than what you wanted.
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::seeknSync(%ld) called.", pos);
 
   // String seeknSync(long);
   tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, orcPos);
 
-  if (jresult != NULL)
+  if (jenv_->ExceptionCheck())
   {
-  	const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
-  	printf("seeknSync error: %s\n", my_string);
-    logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::seeknSync()", jresult);
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_SYNC_EXCEPTION;
   }
-  
+
+  if (jresult != NULL) {
+    const char *my_string = jenv_->GetStringUTFChars(jresult,
+						     JNI_FALSE);
+    QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+		  LL_DEBUG,
+		  "OrcFileReader::seeknSync(%ld) error: %s\n",
+		  pos,
+		  my_string);
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+	     "OrcFileReader::seeknSync()",
+	     jresult);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_SYNC_EXCEPTION;
+  }
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -196,11 +260,22 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::isEOF() called.");
 
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_ISEOF_EXCEPTION;
   // boolean isEOF();
   tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
-  bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID);
+  bool result = jenv_->CallBooleanMethod(javaObj_,
+					 JavaMethods_[JM_ISEOF].methodID);
 
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_ISEOF_EXCEPTION;
+  }
   isEOF = result;
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -211,11 +286,14 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
 OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns)
 {
 /*
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_FETCHROW_EXCEPTION;
   // java.lang.String fetchNextRow(long stopOffset);
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
   if (jresult==NULL && getLastError()) 
   {
     logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_FETCHROW_EXCEPTION;
   }
 
@@ -227,7 +305,7 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
   const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
   strcpy(buffer, char_result);
   jenv_->ReleaseStringUTFChars(jresult, char_result);
-  jenv_->DeleteLocalRef(jresult);  
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 */
 
@@ -236,58 +314,71 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
 
         tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
 	jobject jresult = (jobject)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID);
+    if (jenv_->ExceptionCheck()) 
+    {
+      getExceptionDetails();
+      logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+      logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+      jenv_->PopLocalFrame(NULL);
+      return OFR_ERROR_FETCHROW_EXCEPTION;
+    }
 	if (jresult==NULL && getLastError()) 
   	{
-		  logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
-		  return OFR_ERROR_FETCHROW_EXCEPTION;
+	  logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+	  return OFR_ERROR_FETCHROW_EXCEPTION;
   	}
 
 	if (jresult == NULL)
-		return (OFR_NOMORE);		//No more rows
+	   return (OFR_NOMORE);		//No more rows
 
 //Retrieve row and associated data		
 	jclass cls = jenv_->GetObjectClass(jresult);
 	
 	fid = jenv_->GetFieldID(cls,"m_row_length","I");
 	if (fid ==NULL)
-		{
-			return (OFR_ERROR_FETCHROW_EXCEPTION);
-		}		
+	{
+           jenv_->PopLocalFrame(NULL);
+	   return (OFR_ERROR_FETCHROW_EXCEPTION);
+	}		
 	jint row_length = (jint)jenv_->GetIntField(jresult, fid);
 	array_length = (long)row_length;
 
-	
 	fid = jenv_->GetFieldID(cls,"m_column_count","I");
 	if (fid ==NULL)
-		{
-			return(OFR_ERROR_FETCHROW_EXCEPTION);
-		}
+	{
+           jenv_->PopLocalFrame(NULL);
+           return(OFR_ERROR_FETCHROW_EXCEPTION);
+	}
 	jint column_count = (jint)jenv_->GetIntField(jresult, fid);
 	num_columns = column_count;
 
 	fid = jenv_->GetFieldID(cls,"m_row_number","J");
 	if (fid ==NULL)
-		{
-			return(OFR_ERROR_FETCHROW_EXCEPTION);
-		}
+	{
+           jenv_->PopLocalFrame(NULL);
+	   return(OFR_ERROR_FETCHROW_EXCEPTION);
+	}
 	jlong rowNum = (jlong)jenv_->GetIntField(jresult, fid);
 	rowNumber = rowNum;
-
-	
 // Get the actual row (it is a byte array). Use the row_length above to specify how much to copy	
 	fid = jenv_->GetFieldID(cls,"m_row_ba","[B");
 	if (fid ==NULL)
-		{
-			return (OFR_ERROR_FETCHROW_EXCEPTION);
-		}
+	{
+           jenv_->PopLocalFrame(NULL);
+	   return (OFR_ERROR_FETCHROW_EXCEPTION);
+	}
 	jbyteArray jrow = (jbyteArray)jenv_->GetObjectField(jresult, fid);
 
-  if (jrow == NULL)
-  		return (OFR_ERROR_FETCHROW_EXCEPTION);
+        if (jrow == NULL)
+        {
+           jenv_->PopLocalFrame(NULL);
+           return (OFR_ERROR_FETCHROW_EXCEPTION);
+        }
 
-		jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
- 		jenv_->DeleteLocalRef(jrow);  
+	jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
+ 	jenv_->DeleteLocalRef(jrow);  
 
+  jenv_->PopLocalFrame(NULL);
   return (OFR_OK);
 }
 
@@ -297,22 +388,31 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
 OFR_RetCode OrcFileReader::close()
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return OFR_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETPOS_EXCEPTION;
     
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
 
-  if (jresult!=NULL) 
+  if (jenv_->ExceptionCheck()) 
   {
-    logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::close()", jresult);
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_CLOSE_EXCEPTION;
+  }
+
+  if (jresult!=NULL) {
+    logError(CAT_SQL_HDFS_ORC_FILE_READER,
+	     "OrcFileReader::close()",
+	     jresult);
+    jenv_->PopLocalFrame(NULL);
     return OFR_ERROR_CLOSE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -322,16 +422,22 @@ OFR_RetCode OrcFileReader::close()
 OFR_RetCode OrcFileReader::getRowCount(Int64& count)
 {
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getRowCount() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return OFR_OK;
-  }
-    
+  if (initJNIEnv() != JOI_OK)
+     return OFR_ERROR_GETNUMROWS_EXCEPTION;
+
   tsRecentJMFromJNI = JavaMethods_[JM_GETNUMROWS].jm_full_name;
   jlong jresult = (jlong)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETNUMROWS].methodID);
   count = jresult;
   
+  if (jenv_->ExceptionCheck())
+  {
+    getExceptionDetails();
+    logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+    jenv_->PopLocalFrame(NULL);
+    return OFR_ERROR_GETNUMROWS_EXCEPTION;
+  }
+
+  jenv_->PopLocalFrame(NULL);
   return OFR_OK;
 }
 
@@ -388,5 +494,5 @@ Removed until implemented
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "  =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
   return retCode;
 */
-	return (OFR_NOMORE);
+  return OFR_OK;
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.h b/core/sql/executor/OrcFileReader.h
index c9c491b..925456b 100644
--- a/core/sql/executor/OrcFileReader.h
+++ b/core/sql/executor/OrcFileReader.h
@@ -42,6 +42,7 @@ typedef enum {
  ,OFR_ERROR_ISEOF_EXCEPTION     // Java exception in isEOF()
  ,OFR_ERROR_FETCHROW_EXCEPTION  // Java exception in fetchNextRow()
  ,OFR_ERROR_CLOSE_EXCEPTION     // Java exception in close()
+ ,OFR_ERROR_GETNUMROWS_EXCEPTION
  ,OFR_LAST
 } OFR_RetCode;
 
@@ -115,6 +116,9 @@ private:
  
   static jclass javaClass_;
   static JavaMethodInit* JavaMethods_;
+  static bool javaMethodsInitialized_;
+  // this mutex protects both JaveMethods_ and javaClass_ initialization
+  static pthread_mutex_t javaMethodsInitMutex_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp
index 0ef0399..a67db87 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -176,9 +176,12 @@ SFR_RetCode SequenceFileReader::init()
 SFR_RetCode SequenceFileReader::open(const char* path)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::open(%s) called.", path);
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) 
-    return SFR_ERROR_OPEN_PARAM;
+     jenv_->PopLocalFrame(NULL);
+     return SFR_ERROR_OPEN_PARAM;
 
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
@@ -189,9 +192,10 @@ SFR_RetCode SequenceFileReader::open(const char* path)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_OPEN_EXCEPTION;
   }
-  
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -202,6 +206,9 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::getPosition(%ld) called.", pos);
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_GETPOS_EXCEPTION;
+
   // long getPosition();
   tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
   Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
@@ -209,10 +216,12 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
   if (result == -1) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::getPosition()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_GETPOS_EXCEPTION;
   }
 
   pos = result;
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -223,6 +232,9 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::seeknSync(%ld) called.", pos);
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_GETPOS_EXCEPTION;
+
   // String seeknSync(long);
   tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, pos);
@@ -230,9 +242,11 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::seeknSync()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_SYNC_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -243,10 +257,13 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::isEOF() called.");
 
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_ISEOF_EXCEPTION;
   // boolean isEOF();
   tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
   bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID);
 
+  jenv_->PopLocalFrame(NULL);
   isEOF = result;
   return SFR_OK;
 }
@@ -279,24 +296,29 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
 //////////////////////////////////////////////////////////////////////////////
 SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
 {
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_FETCHROW_EXCEPTION;
+
   // java.lang.String fetchNextRow(long stopOffset);
   tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
   if (jresult==NULL && getLastError()) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::fetchNextRow()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_FETCHROW_EXCEPTION;
   }
 
   if (jresult == NULL)
   {
+    jenv_->PopLocalFrame(NULL);
     return SFR_NOMORE;
   }
   
   const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
   strcpy(buffer, char_result);
   jenv_->ReleaseStringUTFChars(jresult, char_result);
-  jenv_->DeleteLocalRef(jresult);  
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -306,11 +328,9 @@ SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
 SFR_RetCode SequenceFileReader::close()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFR_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return SFR_ERROR_CLOSE_EXCEPTION;
     
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
@@ -319,9 +339,11 @@ SFR_RetCode SequenceFileReader::close()
   if (jresult!=NULL) 
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::close()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFR_ERROR_CLOSE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFR_OK;
 }
 
@@ -535,22 +557,26 @@ SFW_RetCode SequenceFileWriter::init()
 SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::open(%s) called.", path);
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_OPEN_PARAM;
   jstring js_path = jenv_->NewStringUTF(path);
-  if (js_path == NULL) 
-    return SFW_ERROR_OPEN_PARAM;
+  if (js_path == NULL) {
+     jenv_->PopLocalFrame(NULL);
+     return SFW_ERROR_OPEN_PARAM;
+  }
 
   // String open(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path, compression);
 
-  jenv_->DeleteLocalRef(js_path);  
-
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::open()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_OPEN_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -560,9 +586,14 @@ SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression)
 SFW_RetCode SequenceFileWriter::write(const char* data)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::write(%s) called.", data);
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_WRITE_PARAM;
+
   jstring js_data = jenv_->NewStringUTF(data);
-  if (js_data == NULL) 
+  if (js_data == NULL) {
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_WRITE_PARAM;
+  }
 
   // String write(java.lang.String);
   tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
@@ -573,9 +604,11 @@ SFW_RetCode SequenceFileWriter::write(const char* data)
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_WRITE_EXCEPTION;
   }
   
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -616,12 +649,10 @@ SFW_RetCode SequenceFileWriter::writeBuffer(char* data, Int64 buffSize, const ch
 SFW_RetCode SequenceFileWriter::close()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFW_OK;
-  }
     
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_CLOSE_EXCEPTION;
+
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
@@ -629,10 +660,11 @@ SFW_RetCode SequenceFileWriter::close()
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::close()", jresult);
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_CLOSE_EXCEPTION;
   }
   
-  
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
@@ -644,10 +676,8 @@ SFW_RetCode SequenceFileWriter::hdfsCreate(const char* path, NABoolean compress)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsCreate(%s) called.", path);
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
-     return SFW_ERROR_HDFS_WRITE_EXCEPTION;
-  }
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CREATE_PARAM;
 
   jstring js_path = jenv_->NewStringUTF(path);
   if (js_path == NULL) {
@@ -688,10 +718,8 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len)
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsWrite(%ld) called.", len);
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_WRITE_EXCEPTION;
-  }
 
   //Write the requisite bytes into the file
   jbyteArray jbArray = jenv_->NewByteArray( len);
@@ -733,11 +761,9 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len)
 SFW_RetCode SequenceFileWriter::hdfsClose()
 {
   QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
-  if (javaObj_ == NULL)
-  {
-    // Maybe there was an initialization error.
-    return SFW_OK;
-  }
+
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
 
   // String close();
   tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name;
@@ -748,27 +774,27 @@ SFW_RetCode SequenceFileWriter::hdfsClose()
     getExceptionDetails();
     logError(CAT_SQL_HBASE, __FILE__, __LINE__);
     logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
   }
 
   if (jresult == false)
   {
     logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+    jenv_->PopLocalFrame(NULL);
     return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
   }
 
-
+  jenv_->PopLocalFrame(NULL);
   return SFW_OK;
 }
 
 SFW_RetCode SequenceFileWriter::hdfsCleanUnloadPath( const NAString& uldPath)
 {
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsCleanUnloadPath(%s) called.",
-                                                      uldPath.data());
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
-     return SFW_ERROR_HDFS_CLEANUP_EXCEPTION;
-  }
+                                                                             uldPath.data());
+  if (initJNIEnv() != JOI_OK)
+     return SFW_ERROR_HDFS_CLEANUP_PARAM;
 
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {
@@ -799,10 +825,9 @@ SFW_RetCode SequenceFileWriter::hdfsMergeFiles( const NAString& srcPath,
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsMergeFiles(%s, %s) called.",
                   srcPath.data(), dstPath.data());
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
-  }
+
   jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data());
 
   if (js_SrcPath == NULL) {
@@ -845,10 +870,8 @@ SFW_RetCode SequenceFileWriter::hdfsDeletePath( const NAString& delPath)
 {
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsDeletePath(%s called.",
                   delPath.data());
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
-  }
 
   jstring js_delPath = jenv_->NewStringUTF(delPath.data());
   if (js_delPath == NULL) {
@@ -887,10 +910,8 @@ SFW_RetCode SequenceFileWriter::hdfsExists( const NAString& uldPath, NABoolean &
   QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsExists(%s) called.",
                                                       uldPath.data());
 
-  if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
-     getExceptionDetails();
+  if (initJNIEnv() != JOI_OK)
      return SFW_ERROR_HDFS_EXISTS_EXCEPTION;
-  }
 
   jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
   if (js_UldPath == NULL) {


[2/3] incubator-trafodion git commit: [TRAFODION-2821] Trafodion core code base needs to be thread safe

Posted by se...@apache.org.
[TRAFODION-2821] Trafodion core code base needs to be thread safe

Changes as per review


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/46fa80fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/46fa80fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/46fa80fa

Branch: refs/heads/master
Commit: 46fa80faa10b5aadeffe1bef7a3bd0d4d5d2ac73
Parents: 31041e0
Author: selvaganesang <se...@esgyn.com>
Authored: Tue Nov 28 21:16:38 2017 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Tue Nov 28 21:16:38 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/OrcFileReader.cpp      |  6 +--
 core/sql/executor/SequenceFileReader.cpp | 78 ---------------------------
 2 files changed, 3 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/46fa80fa/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp
index d63052d..ddaa27a 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -285,9 +285,10 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
 //OFR_RetCode OrcFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
 OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns)
 {
-/*
+
   if (initJNIEnv() != JOI_OK)
      return OFR_ERROR_FETCHROW_EXCEPTION;
+/*
   // java.lang.String fetchNextRow(long stopOffset);
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
   if (jresult==NULL && getLastError()) 
@@ -376,7 +377,6 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
         }
 
 	jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
- 	jenv_->DeleteLocalRef(jrow);  
 
   jenv_->PopLocalFrame(NULL);
   return (OFR_OK);
@@ -494,5 +494,5 @@ Removed until implemented
   QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "  =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
   return retCode;
 */
-  return OFR_OK;
+  return (OFR_NOMORE);
 }

http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/46fa80fa/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp
index a67db87..a12673d 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -187,8 +187,6 @@ SFR_RetCode SequenceFileReader::open(const char* path)
   tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
 
-  jenv_->DeleteLocalRef(js_path);  
-
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult);
@@ -271,29 +269,6 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
 //////////////////////////////////////////////////////////////////////////////
 // 
 //////////////////////////////////////////////////////////////////////////////
-//char** SequenceFileReader::fetchArrayOfColumns()
-//{
-//  // java.lang.String[] fetchArrayOfColumns();
-//  jobjectArray jresult = (jobjectArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHCOLS].methodID);
-//  if (jenv_->ExceptionCheck()) 
-//  {
-//    jenv_->ExceptionDescribe();
-//    jenv_->ExceptionClear();
-//    jenv_->DeleteLocalRef(jresult);
-//    return NULL;
-//  }
-//
-//  if (jresult == NULL)
-//  {
-//    return NULL;
-//  }
-//	
-//  return SequenceFileReader::JStringArray2CharsArray(jresult);
-//}
-
-//////////////////////////////////////////////////////////////////////////////
-// 
-//////////////////////////////////////////////////////////////////////////////
 SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
 {
   if (initJNIEnv() != JOI_OK)
@@ -361,57 +336,6 @@ jstring SequenceFileReader::getLastError()
 //////////////////////////////////////////////////////////////////////////////
 // 
 //////////////////////////////////////////////////////////////////////////////
-//char** SequenceFileReader::JStringArray2CharsArray(jobjectArray jarray)
-//{
-//  char **chars_array;
-//  int len;
-//  int i;
-//  const char *ret_val;
-//  jstring jst_ret;
-//
-//  len = jenv_->GetArrayLength(jarray);
-//
-//  chars_array = (char **)malloc(sizeof(char*) * (len + 1));
-//  if (chars_array == NULL) 
-//  {
-//    //TRACE(stderr, "<%s:%d> malloc() failed\n", __FILE__, __LINE__);
-//    return NULL;
-//  }
-//
-//  for (i = 0; i < len; i++) 
-//  {
-//    ret_val = "";
-//
-//    jst_ret = (jstring)jenv_->GetObjectArrayElement(jarray, i);
-//    if (jst_ret != NULL) 
-//    {
-//      ret_val = jenv_->GetStringUTFChars(jst_ret, 0);
-//    }
-//
-//    //TRACE(stderr, "<%s:%d> %d) => %s\n", __FILE__, __LINE__, i, ret_val);
-//
-//    chars_array[i] = strdup(ret_val);
-//    if (chars_array[i] == NULL) 
-//    {
-//      // TODO: Add error handling
-//      return NULL;
-//    }
-//
-//    if (jst_ret != NULL) 
-//    {
-//      jenv_->ReleaseStringUTFChars(jst_ret, ret_val);
-//      jenv_->DeleteLocalRef(jst_ret);  
-//    }
-//  }
-//  chars_array[i] = NULL;
-//  jenv_->DeleteLocalRef(jarray);  
-//
-//  return chars_array;
-//}
-
-//////////////////////////////////////////////////////////////////////////////
-// 
-//////////////////////////////////////////////////////////////////////////////
 SFR_RetCode SequenceFileReader::fetchRowsIntoBuffer(Int64   stopOffset, 
                                                     char*   buffer, 
                                                     Int64   buffSize, 
@@ -599,8 +523,6 @@ SFW_RetCode SequenceFileWriter::write(const char* data)
   tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
   jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_WRITE].methodID, js_data);
 
-  jenv_->DeleteLocalRef(js_data);  
-
   if (jresult != NULL)
   {
     logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult);


[3/3] incubator-trafodion git commit: Merge PR 1314 [TRAFODION-2821] Trafodion core code base needs to be thread safe

Posted by se...@apache.org.
Merge PR 1314 [TRAFODION-2821] Trafodion core code base needs to be thread safe


Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/1efa9dbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/1efa9dbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/1efa9dbd

Branch: refs/heads/master
Commit: 1efa9dbddbaa2308928197d57e4c6fe9461d8bcc
Parents: 6d2213d 46fa80f
Author: selvaganesang <se...@apache.org>
Authored: Wed Nov 29 05:27:15 2017 +0000
Committer: selvaganesang <se...@apache.org>
Committed: Wed Nov 29 05:27:15 2017 +0000

----------------------------------------------------------------------
 core/sql/executor/OrcFileReader.cpp      | 222 +++++++++++++++++++-------
 core/sql/executor/OrcFileReader.h        |   4 +
 core/sql/executor/SequenceFileReader.cpp | 191 ++++++++--------------
 3 files changed, 235 insertions(+), 182 deletions(-)
----------------------------------------------------------------------