You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by se...@apache.org on 2017/11/29 05:29:01 UTC
[1/3] incubator-trafodion git commit: [TRAFODION-2821] Trafodion core
code base needs to be thread safe
Repository: incubator-trafodion
Updated Branches:
refs/heads/master 6d2213d04 -> 1efa9dbdd
[TRAFODION-2821] Trafodion core code base needs to be thread safe
The method Ids of the different Java classes are now initialized
in a thread safe manner in JNI layer.
The missing code to initialize the thread specific JNIEnv are added.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/31041e0f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/31041e0f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/31041e0f
Branch: refs/heads/master
Commit: 31041e0f7630cea22d866e53fe6e8c7a4d046974
Parents: 19c7544
Author: selvaganesang <se...@esgyn.com>
Authored: Tue Nov 28 13:45:41 2017 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Tue Nov 28 13:45:41 2017 +0000
----------------------------------------------------------------------
core/sql/executor/OrcFileReader.cpp | 222 +++++++++++++++++++-------
core/sql/executor/OrcFileReader.h | 4 +
core/sql/executor/SequenceFileReader.cpp | 113 +++++++------
3 files changed, 235 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp
index 7696589..d63052d 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -30,6 +30,8 @@
JavaMethodInit* OrcFileReader::JavaMethods_ = NULL;
jclass OrcFileReader::javaClass_ = 0;
+bool OrcFileReader::javaMethodsInitialized_ = false;
+pthread_mutex_t OrcFileReader::javaMethodsInitMutex_ = PTHREAD_MUTEX_INITIALIZER;
static const char* const sfrErrorEnumStr[] =
{
@@ -43,8 +45,10 @@ static const char* const sfrErrorEnumStr[] =
,"Java exception in isEOF()"
,"Java exception in fetchNextRow()"
,"Java exception in close()"
+ ,"Java exception in getRowNum()"
};
+
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
@@ -70,11 +74,29 @@ OrcFileReader::~OrcFileReader()
OFR_RetCode OrcFileReader::init()
{
static char className[]="org/trafodion/sql/OrcFileReader";
-
- if (JavaMethods_)
- return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, TRUE);
- else
+
+ OFR_RetCode lv_retcode = OFR_OK;
+
+ QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+ LL_DEBUG,
+ "Enter OrcFileReader::init()");
+
+ if (isInitialized())
+ return lv_retcode;
+
+ if (javaMethodsInitialized_)
+ return (OFR_RetCode)JavaObjectInterface::init(className,
+ javaClass_,
+ JavaMethods_,
+ (Int32)JM_LAST, javaMethodsInitialized_);
+ else
{
+ pthread_mutex_lock(&javaMethodsInitMutex_);
+ if (javaMethodsInitialized_)
+ {
+ pthread_mutex_unlock(&javaMethodsInitMutex_);
+ return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, javaMethodsInitialized_);
+ }
JavaMethods_ = new JavaMethodInit[JM_LAST];
JavaMethods_[JM_CTOR ].jm_name = "<init>";
@@ -111,9 +133,15 @@ OFR_RetCode OrcFileReader::init()
// JavaMethods_[JM_FETCHBUFF2].jm_signature = "(II)[Ljava/lang/String;";
JavaMethods_[JM_CLOSE ].jm_name = "close";
JavaMethods_[JM_CLOSE ].jm_signature = "()Ljava/lang/String;";
-
- return (OFR_RetCode)JavaObjectInterface::init(className, javaClass_, JavaMethods_, (Int32)JM_LAST, FALSE);
+
+ lv_retcode = (OFR_RetCode)JavaObjectInterface::init(className,
+ javaClass_,
+ JavaMethods_,
+ (Int32)JM_LAST, javaMethodsInitialized_);
+ javaMethodsInitialized_ = TRUE;
+ pthread_mutex_unlock(&javaMethodsInitMutex_);
}
+ return lv_retcode;
}
//////////////////////////////////////////////////////////////////////////////
@@ -122,24 +150,28 @@ OFR_RetCode OrcFileReader::init()
OFR_RetCode OrcFileReader::open(const char* path)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::open(%s) called.", path);
+
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_OPEN_PARAM;
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL)
+ {
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_OPEN_PARAM;
-
+ }
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
- jenv_->DeleteLocalRef(js_path);
-
- if (jresult != NULL)
+ if (jenv_->ExceptionCheck())
{
- const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
- printf("open error: %s\n", my_string);
+ getExceptionDetails();
logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::open()", jresult);
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_OPEN_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -149,18 +181,32 @@ OFR_RetCode OrcFileReader::open(const char* path)
OFR_RetCode OrcFileReader::getPosition(Int64& pos)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getPosition(%ld) called.", pos);
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_GETPOS_EXCEPTION;
// long getPosition();
tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
- if (result == -1)
+ if (jenv_->ExceptionCheck())
{
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::getPosition()", getLastError());
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_GETPOS_EXCEPTION;
+ }
+
+ if (result == -1) {
+ logError(CAT_SQL_HDFS_ORC_FILE_READER,
+ "OrcFileReader::getPosition()",
+ getLastError());
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_GETPOS_EXCEPTION;
}
pos = result;
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -169,23 +215,41 @@ OFR_RetCode OrcFileReader::getPosition(Int64& pos)
//////////////////////////////////////////////////////////////////////////////
OFR_RetCode OrcFileReader::seeknSync(Int64 pos)
{
- Int64 orcPos;
+ Int64 orcPos;
+
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_SYNC_EXCEPTION;
- orcPos = pos -1; //When you position in ORC, reading the NEXT row will be one greater than what you wanted.
+ orcPos = pos -1; //When you position in ORC, reading the NEXT row will be one greater than what you wanted.
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::seeknSync(%ld) called.", pos);
// String seeknSync(long);
tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, orcPos);
- if (jresult != NULL)
+ if (jenv_->ExceptionCheck())
{
- const char *my_string = jenv_->GetStringUTFChars(jresult, JNI_FALSE);
- printf("seeknSync error: %s\n", my_string);
- logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::seeknSync()", jresult);
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_SYNC_EXCEPTION;
}
-
+
+ if (jresult != NULL) {
+ const char *my_string = jenv_->GetStringUTFChars(jresult,
+ JNI_FALSE);
+ QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER,
+ LL_DEBUG,
+ "OrcFileReader::seeknSync(%ld) error: %s\n",
+ pos,
+ my_string);
+ logError(CAT_SQL_HDFS_ORC_FILE_READER,
+ "OrcFileReader::seeknSync()",
+ jresult);
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_SYNC_EXCEPTION;
+ }
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -196,11 +260,22 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::isEOF() called.");
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_ISEOF_EXCEPTION;
// boolean isEOF();
tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
- bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID);
+ bool result = jenv_->CallBooleanMethod(javaObj_,
+ JavaMethods_[JM_ISEOF].methodID);
+ if (jenv_->ExceptionCheck())
+ {
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_ISEOF_EXCEPTION;
+ }
isEOF = result;
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -211,11 +286,14 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns)
{
/*
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_FETCHROW_EXCEPTION;
// java.lang.String fetchNextRow(long stopOffset);
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
if (jresult==NULL && getLastError())
{
logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_FETCHROW_EXCEPTION;
}
@@ -227,7 +305,7 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
strcpy(buffer, char_result);
jenv_->ReleaseStringUTFChars(jresult, char_result);
- jenv_->DeleteLocalRef(jresult);
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
*/
@@ -236,58 +314,71 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
jobject jresult = (jobject)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID);
+ if (jenv_->ExceptionCheck())
+ {
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_FETCHROW_EXCEPTION;
+ }
if (jresult==NULL && getLastError())
{
- logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
- return OFR_ERROR_FETCHROW_EXCEPTION;
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::fetchNextRow()", getLastError());
+ return OFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult == NULL)
- return (OFR_NOMORE); //No more rows
+ return (OFR_NOMORE); //No more rows
//Retrieve row and associated data
jclass cls = jenv_->GetObjectClass(jresult);
fid = jenv_->GetFieldID(cls,"m_row_length","I");
if (fid ==NULL)
- {
- return (OFR_ERROR_FETCHROW_EXCEPTION);
- }
+ {
+ jenv_->PopLocalFrame(NULL);
+ return (OFR_ERROR_FETCHROW_EXCEPTION);
+ }
jint row_length = (jint)jenv_->GetIntField(jresult, fid);
array_length = (long)row_length;
-
fid = jenv_->GetFieldID(cls,"m_column_count","I");
if (fid ==NULL)
- {
- return(OFR_ERROR_FETCHROW_EXCEPTION);
- }
+ {
+ jenv_->PopLocalFrame(NULL);
+ return(OFR_ERROR_FETCHROW_EXCEPTION);
+ }
jint column_count = (jint)jenv_->GetIntField(jresult, fid);
num_columns = column_count;
fid = jenv_->GetFieldID(cls,"m_row_number","J");
if (fid ==NULL)
- {
- return(OFR_ERROR_FETCHROW_EXCEPTION);
- }
+ {
+ jenv_->PopLocalFrame(NULL);
+ return(OFR_ERROR_FETCHROW_EXCEPTION);
+ }
jlong rowNum = (jlong)jenv_->GetIntField(jresult, fid);
rowNumber = rowNum;
-
-
// Get the actual row (it is a byte array). Use the row_length above to specify how much to copy
fid = jenv_->GetFieldID(cls,"m_row_ba","[B");
if (fid ==NULL)
- {
- return (OFR_ERROR_FETCHROW_EXCEPTION);
- }
+ {
+ jenv_->PopLocalFrame(NULL);
+ return (OFR_ERROR_FETCHROW_EXCEPTION);
+ }
jbyteArray jrow = (jbyteArray)jenv_->GetObjectField(jresult, fid);
- if (jrow == NULL)
- return (OFR_ERROR_FETCHROW_EXCEPTION);
+ if (jrow == NULL)
+ {
+ jenv_->PopLocalFrame(NULL);
+ return (OFR_ERROR_FETCHROW_EXCEPTION);
+ }
- jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
- jenv_->DeleteLocalRef(jrow);
+ jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
+ jenv_->DeleteLocalRef(jrow);
+ jenv_->PopLocalFrame(NULL);
return (OFR_OK);
}
@@ -297,22 +388,31 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
OFR_RetCode OrcFileReader::close()
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::close() called.");
- if (javaObj_ == NULL)
- {
- // Maybe there was an initialization error.
- return OFR_OK;
- }
+
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_GETPOS_EXCEPTION;
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
- if (jresult!=NULL)
+ if (jenv_->ExceptionCheck())
{
- logError(CAT_SQL_HDFS_ORC_FILE_READER, "OrcFileReader::close()", jresult);
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_CLOSE_EXCEPTION;
+ }
+
+ if (jresult!=NULL) {
+ logError(CAT_SQL_HDFS_ORC_FILE_READER,
+ "OrcFileReader::close()",
+ jresult);
+ jenv_->PopLocalFrame(NULL);
return OFR_ERROR_CLOSE_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -322,16 +422,22 @@ OFR_RetCode OrcFileReader::close()
OFR_RetCode OrcFileReader::getRowCount(Int64& count)
{
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, "OrcFileReader::getRowCount() called.");
- if (javaObj_ == NULL)
- {
- // Maybe there was an initialization error.
- return OFR_OK;
- }
-
+ if (initJNIEnv() != JOI_OK)
+ return OFR_ERROR_GETNUMROWS_EXCEPTION;
+
tsRecentJMFromJNI = JavaMethods_[JM_GETNUMROWS].jm_full_name;
jlong jresult = (jlong)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_GETNUMROWS].methodID);
count = jresult;
+ if (jenv_->ExceptionCheck())
+ {
+ getExceptionDetails();
+ logError(CAT_SQL_HDFS_ORC_FILE_READER, __FILE__, __LINE__);
+ jenv_->PopLocalFrame(NULL);
+ return OFR_ERROR_GETNUMROWS_EXCEPTION;
+ }
+
+ jenv_->PopLocalFrame(NULL);
return OFR_OK;
}
@@ -388,5 +494,5 @@ Removed until implemented
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, " =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
return retCode;
*/
- return (OFR_NOMORE);
+ return OFR_OK;
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/OrcFileReader.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.h b/core/sql/executor/OrcFileReader.h
index c9c491b..925456b 100644
--- a/core/sql/executor/OrcFileReader.h
+++ b/core/sql/executor/OrcFileReader.h
@@ -42,6 +42,7 @@ typedef enum {
,OFR_ERROR_ISEOF_EXCEPTION // Java exception in isEOF()
,OFR_ERROR_FETCHROW_EXCEPTION // Java exception in fetchNextRow()
,OFR_ERROR_CLOSE_EXCEPTION // Java exception in close()
+ ,OFR_ERROR_GETNUMROWS_EXCEPTION
,OFR_LAST
} OFR_RetCode;
@@ -115,6 +116,9 @@ private:
static jclass javaClass_;
static JavaMethodInit* JavaMethods_;
+ static bool javaMethodsInitialized_;
+ // this mutex protects both JaveMethods_ and javaClass_ initialization
+ static pthread_mutex_t javaMethodsInitMutex_;
};
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/31041e0f/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp
index 0ef0399..a67db87 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -176,9 +176,12 @@ SFR_RetCode SequenceFileReader::init()
SFR_RetCode SequenceFileReader::open(const char* path)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::open(%s) called.", path);
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_OPEN_PARAM;
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL)
- return SFR_ERROR_OPEN_PARAM;
+ jenv_->PopLocalFrame(NULL);
+ return SFR_ERROR_OPEN_PARAM;
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
@@ -189,9 +192,10 @@ SFR_RetCode SequenceFileReader::open(const char* path)
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFR_ERROR_OPEN_EXCEPTION;
}
-
+ jenv_->PopLocalFrame(NULL);
return SFR_OK;
}
@@ -202,6 +206,9 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::getPosition(%ld) called.", pos);
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_GETPOS_EXCEPTION;
+
// long getPosition();
tsRecentJMFromJNI = JavaMethods_[JM_GETPOS].jm_full_name;
Int64 result = jenv_->CallLongMethod(javaObj_, JavaMethods_[JM_GETPOS].methodID);
@@ -209,10 +216,12 @@ SFR_RetCode SequenceFileReader::getPosition(Int64& pos)
if (result == -1)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::getPosition()", getLastError());
+ jenv_->PopLocalFrame(NULL);
return SFR_ERROR_GETPOS_EXCEPTION;
}
pos = result;
+ jenv_->PopLocalFrame(NULL);
return SFR_OK;
}
@@ -223,6 +232,9 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::seeknSync(%ld) called.", pos);
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_GETPOS_EXCEPTION;
+
// String seeknSync(long);
tsRecentJMFromJNI = JavaMethods_[JM_SYNC].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_SYNC].methodID, pos);
@@ -230,9 +242,11 @@ SFR_RetCode SequenceFileReader::seeknSync(Int64 pos)
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::seeknSync()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFR_ERROR_SYNC_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return SFR_OK;
}
@@ -243,10 +257,13 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::isEOF() called.");
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_ISEOF_EXCEPTION;
// boolean isEOF();
tsRecentJMFromJNI = JavaMethods_[JM_ISEOF].jm_full_name;
bool result = jenv_->CallBooleanMethod(javaObj_, JavaMethods_[JM_ISEOF].methodID);
+ jenv_->PopLocalFrame(NULL);
isEOF = result;
return SFR_OK;
}
@@ -279,24 +296,29 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
{
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_FETCHROW_EXCEPTION;
+
// java.lang.String fetchNextRow(long stopOffset);
tsRecentJMFromJNI = JavaMethods_[JM_FETCHROW2].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
if (jresult==NULL && getLastError())
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::fetchNextRow()", getLastError());
+ jenv_->PopLocalFrame(NULL);
return SFR_ERROR_FETCHROW_EXCEPTION;
}
if (jresult == NULL)
{
+ jenv_->PopLocalFrame(NULL);
return SFR_NOMORE;
}
const char* char_result = jenv_->GetStringUTFChars(jresult, 0);
strcpy(buffer, char_result);
jenv_->ReleaseStringUTFChars(jresult, char_result);
- jenv_->DeleteLocalRef(jresult);
+ jenv_->PopLocalFrame(NULL);
return SFR_OK;
}
@@ -306,11 +328,9 @@ SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
SFR_RetCode SequenceFileReader::close()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_READER, LL_DEBUG, "SequenceFileReader::close() called.");
- if (javaObj_ == NULL)
- {
- // Maybe there was an initialization error.
- return SFR_OK;
- }
+
+ if (initJNIEnv() != JOI_OK)
+ return SFR_ERROR_CLOSE_EXCEPTION;
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
@@ -319,9 +339,11 @@ SFR_RetCode SequenceFileReader::close()
if (jresult!=NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::close()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFR_ERROR_CLOSE_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return SFR_OK;
}
@@ -535,22 +557,26 @@ SFW_RetCode SequenceFileWriter::init()
SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::open(%s) called.", path);
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_OPEN_PARAM;
jstring js_path = jenv_->NewStringUTF(path);
- if (js_path == NULL)
- return SFW_ERROR_OPEN_PARAM;
+ if (js_path == NULL) {
+ jenv_->PopLocalFrame(NULL);
+ return SFW_ERROR_OPEN_PARAM;
+ }
// String open(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path, compression);
- jenv_->DeleteLocalRef(js_path);
-
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::open()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_OPEN_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
@@ -560,9 +586,14 @@ SFW_RetCode SequenceFileWriter::open(const char* path, SFW_CompType compression)
SFW_RetCode SequenceFileWriter::write(const char* data)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::write(%s) called.", data);
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_WRITE_PARAM;
+
jstring js_data = jenv_->NewStringUTF(data);
- if (js_data == NULL)
+ if (js_data == NULL) {
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_WRITE_PARAM;
+ }
// String write(java.lang.String);
tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
@@ -573,9 +604,11 @@ SFW_RetCode SequenceFileWriter::write(const char* data)
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_WRITE_EXCEPTION;
}
+ jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
@@ -616,12 +649,10 @@ SFW_RetCode SequenceFileWriter::writeBuffer(char* data, Int64 buffSize, const ch
SFW_RetCode SequenceFileWriter::close()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
- if (javaObj_ == NULL)
- {
- // Maybe there was an initialization error.
- return SFW_OK;
- }
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_CLOSE_EXCEPTION;
+
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_CLOSE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_CLOSE].methodID);
@@ -629,10 +660,11 @@ SFW_RetCode SequenceFileWriter::close()
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::close()", jresult);
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_CLOSE_EXCEPTION;
}
-
+ jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
@@ -644,10 +676,8 @@ SFW_RetCode SequenceFileWriter::hdfsCreate(const char* path, NABoolean compress)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsCreate(%s) called.", path);
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
- return SFW_ERROR_HDFS_WRITE_EXCEPTION;
- }
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_HDFS_CREATE_PARAM;
jstring js_path = jenv_->NewStringUTF(path);
if (js_path == NULL) {
@@ -688,10 +718,8 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len)
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::hdfsWrite(%ld) called.", len);
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
+ if (initJNIEnv() != JOI_OK)
return SFW_ERROR_HDFS_WRITE_EXCEPTION;
- }
//Write the requisite bytes into the file
jbyteArray jbArray = jenv_->NewByteArray( len);
@@ -733,11 +761,9 @@ SFW_RetCode SequenceFileWriter::hdfsWrite(const char* data, Int64 len)
SFW_RetCode SequenceFileWriter::hdfsClose()
{
QRLogger::log(CAT_SQL_HDFS_SEQ_FILE_WRITER, LL_DEBUG, "SequenceFileWriter::close() called.");
- if (javaObj_ == NULL)
- {
- // Maybe there was an initialization error.
- return SFW_OK;
- }
+
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
// String close();
tsRecentJMFromJNI = JavaMethods_[JM_HDFS_CLOSE].jm_full_name;
@@ -748,27 +774,27 @@ SFW_RetCode SequenceFileWriter::hdfsClose()
getExceptionDetails();
logError(CAT_SQL_HBASE, __FILE__, __LINE__);
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
}
if (jresult == false)
{
logError(CAT_SQL_HBASE, "SequenceFileWriter::hdfsClose()", getLastError());
+ jenv_->PopLocalFrame(NULL);
return SFW_ERROR_HDFS_CLOSE_EXCEPTION;
}
-
+ jenv_->PopLocalFrame(NULL);
return SFW_OK;
}
SFW_RetCode SequenceFileWriter::hdfsCleanUnloadPath( const NAString& uldPath)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsCleanUnloadPath(%s) called.",
- uldPath.data());
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
- return SFW_ERROR_HDFS_CLEANUP_EXCEPTION;
- }
+ uldPath.data());
+ if (initJNIEnv() != JOI_OK)
+ return SFW_ERROR_HDFS_CLEANUP_PARAM;
jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
if (js_UldPath == NULL) {
@@ -799,10 +825,9 @@ SFW_RetCode SequenceFileWriter::hdfsMergeFiles( const NAString& srcPath,
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsMergeFiles(%s, %s) called.",
srcPath.data(), dstPath.data());
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
+ if (initJNIEnv() != JOI_OK)
return SFW_ERROR_HDFS_MERGE_FILES_EXCEPTION;
- }
+
jstring js_SrcPath = jenv_->NewStringUTF(srcPath.data());
if (js_SrcPath == NULL) {
@@ -845,10 +870,8 @@ SFW_RetCode SequenceFileWriter::hdfsDeletePath( const NAString& delPath)
{
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsDeletePath(%s called.",
delPath.data());
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
+ if (initJNIEnv() != JOI_OK)
return SFW_ERROR_HDFS_DELETE_PATH_EXCEPTION;
- }
jstring js_delPath = jenv_->NewStringUTF(delPath.data());
if (js_delPath == NULL) {
@@ -887,10 +910,8 @@ SFW_RetCode SequenceFileWriter::hdfsExists( const NAString& uldPath, NABoolean &
QRLogger::log(CAT_SQL_HBASE, LL_DEBUG, "SequenceFileWriter::hdfsExists(%s) called.",
uldPath.data());
- if (jenv_->PushLocalFrame(jniHandleCapacity_) != 0) {
- getExceptionDetails();
+ if (initJNIEnv() != JOI_OK)
return SFW_ERROR_HDFS_EXISTS_EXCEPTION;
- }
jstring js_UldPath = jenv_->NewStringUTF(uldPath.data());
if (js_UldPath == NULL) {
[2/3] incubator-trafodion git commit: [TRAFODION-2821] Trafodion core
code base needs to be thread safe
Posted by se...@apache.org.
[TRAFODION-2821] Trafodion core code base needs to be thread safe
Changes as per review
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/46fa80fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/46fa80fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/46fa80fa
Branch: refs/heads/master
Commit: 46fa80faa10b5aadeffe1bef7a3bd0d4d5d2ac73
Parents: 31041e0
Author: selvaganesang <se...@esgyn.com>
Authored: Tue Nov 28 21:16:38 2017 +0000
Committer: selvaganesang <se...@esgyn.com>
Committed: Tue Nov 28 21:16:38 2017 +0000
----------------------------------------------------------------------
core/sql/executor/OrcFileReader.cpp | 6 +--
core/sql/executor/SequenceFileReader.cpp | 78 ---------------------------
2 files changed, 3 insertions(+), 81 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/46fa80fa/core/sql/executor/OrcFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/OrcFileReader.cpp b/core/sql/executor/OrcFileReader.cpp
index d63052d..ddaa27a 100644
--- a/core/sql/executor/OrcFileReader.cpp
+++ b/core/sql/executor/OrcFileReader.cpp
@@ -285,9 +285,10 @@ OFR_RetCode OrcFileReader::isEOF(bool& isEOF)
//OFR_RetCode OrcFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long& rowNumber, int& num_columns)
{
-/*
+
if (initJNIEnv() != JOI_OK)
return OFR_ERROR_FETCHROW_EXCEPTION;
+/*
// java.lang.String fetchNextRow(long stopOffset);
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHROW2].methodID, stopOffset);
if (jresult==NULL && getLastError())
@@ -376,7 +377,6 @@ OFR_RetCode OrcFileReader::fetchNextRow(char * buffer, long& array_length, long&
}
jenv_->GetByteArrayRegion(jrow, 0, row_length, (jbyte*)buffer);
- jenv_->DeleteLocalRef(jrow);
jenv_->PopLocalFrame(NULL);
return (OFR_OK);
@@ -494,5 +494,5 @@ Removed until implemented
QRLogger::log(CAT_SQL_HDFS_ORC_FILE_READER, LL_DEBUG, " =>Returning %d, read %ld bytes in %d rows.", retCode, bytesRead, rowsRead);
return retCode;
*/
- return OFR_OK;
+ return (OFR_NOMORE);
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/46fa80fa/core/sql/executor/SequenceFileReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/SequenceFileReader.cpp b/core/sql/executor/SequenceFileReader.cpp
index a67db87..a12673d 100644
--- a/core/sql/executor/SequenceFileReader.cpp
+++ b/core/sql/executor/SequenceFileReader.cpp
@@ -187,8 +187,6 @@ SFR_RetCode SequenceFileReader::open(const char* path)
tsRecentJMFromJNI = JavaMethods_[JM_OPEN].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_OPEN].methodID, js_path);
- jenv_->DeleteLocalRef(js_path);
-
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_READER, "SequenceFileReader::open()", jresult);
@@ -271,29 +269,6 @@ SFR_RetCode SequenceFileReader::isEOF(bool& isEOF)
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
-//char** SequenceFileReader::fetchArrayOfColumns()
-//{
-// // java.lang.String[] fetchArrayOfColumns();
-// jobjectArray jresult = (jobjectArray)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_FETCHCOLS].methodID);
-// if (jenv_->ExceptionCheck())
-// {
-// jenv_->ExceptionDescribe();
-// jenv_->ExceptionClear();
-// jenv_->DeleteLocalRef(jresult);
-// return NULL;
-// }
-//
-// if (jresult == NULL)
-// {
-// return NULL;
-// }
-//
-// return SequenceFileReader::JStringArray2CharsArray(jresult);
-//}
-
-//////////////////////////////////////////////////////////////////////////////
-//
-//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::fetchNextRow(Int64 stopOffset, char* buffer)
{
if (initJNIEnv() != JOI_OK)
@@ -361,57 +336,6 @@ jstring SequenceFileReader::getLastError()
//////////////////////////////////////////////////////////////////////////////
//
//////////////////////////////////////////////////////////////////////////////
-//char** SequenceFileReader::JStringArray2CharsArray(jobjectArray jarray)
-//{
-// char **chars_array;
-// int len;
-// int i;
-// const char *ret_val;
-// jstring jst_ret;
-//
-// len = jenv_->GetArrayLength(jarray);
-//
-// chars_array = (char **)malloc(sizeof(char*) * (len + 1));
-// if (chars_array == NULL)
-// {
-// //TRACE(stderr, "<%s:%d> malloc() failed\n", __FILE__, __LINE__);
-// return NULL;
-// }
-//
-// for (i = 0; i < len; i++)
-// {
-// ret_val = "";
-//
-// jst_ret = (jstring)jenv_->GetObjectArrayElement(jarray, i);
-// if (jst_ret != NULL)
-// {
-// ret_val = jenv_->GetStringUTFChars(jst_ret, 0);
-// }
-//
-// //TRACE(stderr, "<%s:%d> %d) => %s\n", __FILE__, __LINE__, i, ret_val);
-//
-// chars_array[i] = strdup(ret_val);
-// if (chars_array[i] == NULL)
-// {
-// // TODO: Add error handling
-// return NULL;
-// }
-//
-// if (jst_ret != NULL)
-// {
-// jenv_->ReleaseStringUTFChars(jst_ret, ret_val);
-// jenv_->DeleteLocalRef(jst_ret);
-// }
-// }
-// chars_array[i] = NULL;
-// jenv_->DeleteLocalRef(jarray);
-//
-// return chars_array;
-//}
-
-//////////////////////////////////////////////////////////////////////////////
-//
-//////////////////////////////////////////////////////////////////////////////
SFR_RetCode SequenceFileReader::fetchRowsIntoBuffer(Int64 stopOffset,
char* buffer,
Int64 buffSize,
@@ -599,8 +523,6 @@ SFW_RetCode SequenceFileWriter::write(const char* data)
tsRecentJMFromJNI = JavaMethods_[JM_WRITE].jm_full_name;
jstring jresult = (jstring)jenv_->CallObjectMethod(javaObj_, JavaMethods_[JM_WRITE].methodID, js_data);
- jenv_->DeleteLocalRef(js_data);
-
if (jresult != NULL)
{
logError(CAT_SQL_HDFS_SEQ_FILE_WRITER, "SequenceFileWriter::write()", jresult);
[3/3] incubator-trafodion git commit: Merge PR 1314 [TRAFODION-2821]
Trafodion core code base needs to be thread safe
Posted by se...@apache.org.
Merge PR 1314 [TRAFODION-2821] Trafodion core code base needs to be thread safe
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/1efa9dbd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/1efa9dbd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/1efa9dbd
Branch: refs/heads/master
Commit: 1efa9dbddbaa2308928197d57e4c6fe9461d8bcc
Parents: 6d2213d 46fa80f
Author: selvaganesang <se...@apache.org>
Authored: Wed Nov 29 05:27:15 2017 +0000
Committer: selvaganesang <se...@apache.org>
Committed: Wed Nov 29 05:27:15 2017 +0000
----------------------------------------------------------------------
core/sql/executor/OrcFileReader.cpp | 222 +++++++++++++++++++-------
core/sql/executor/OrcFileReader.h | 4 +
core/sql/executor/SequenceFileReader.cpp | 191 ++++++++--------------
3 files changed, 235 insertions(+), 182 deletions(-)
----------------------------------------------------------------------