You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@trafodion.apache.org by sa...@apache.org on 2015/10/27 05:27:49 UTC
[1/3] incubator-trafodion git commit: [TRAFODION-1549]
[TRAFODION-1550] TRAFODION-1549 : Extends event_log_reader TMUDF to read DCS
logs and overflow files from all components TRAFODION-1550 : Improves
performance of bulkloader by determining flush size
Repository: incubator-trafodion
Updated Branches:
refs/heads/master 2578f5109 -> d060bef04
[TRAFODION-1549] [TRAFODION-1550]
TRAFODION-1549 : Extends event_log_reader TMUDF to read DCS logs and overflow files from all components
TRAFODION-1550 : Improves performance of bulkloader by determining flush size based on row length
Other changes
Rework for TRAFODION-1279 suggested by Hans and not addressed in previous pull request
Rework for TRAFODION-1474 for a regression found by Weishiun Tsai
Fix for regress/hive/TEST020 occasional failure on build machine
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/f8e41b95
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/f8e41b95
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/f8e41b95
Branch: refs/heads/master
Commit: f8e41b956a8316c9847f36ea8c49746a99949b99
Parents: 46f7be4
Author: Suresh Subbiah <su...@apache.org>
Authored: Wed Oct 21 22:53:57 2015 +0000
Committer: Suresh Subbiah <su...@apache.org>
Committed: Wed Oct 21 22:53:57 2015 +0000
----------------------------------------------------------------------
core/sqf/sql/scripts/genms | 3 +
core/sql/comexe/ComTdbHbaseAccess.cpp | 4 +-
core/sql/comexe/ComTdbHbaseAccess.h | 6 +
core/sql/executor/ExHbaseAccess.cpp | 4 +-
core/sql/executor/ExHbaseAccess.h | 6 +-
core/sql/executor/ExHbaseIUD.cpp | 6 +-
core/sql/generator/GenExplain.cpp | 8 +
core/sql/generator/GenRelUpdate.cpp | 14 +
core/sql/generator/GenUdr.cpp | 5 +-
core/sql/optimizer/NAColumn.cpp | 4 +-
core/sql/optimizer/NAColumn.h | 2 +-
core/sql/regress/hive/EXPECTED020 | 63 +-
core/sql/regress/hive/TEST020 | 27 +-
core/sql/sqlcomp/CmpSeabaseDDLtable.cpp | 2 +-
core/sql/sqlcomp/DefaultConstants.h | 1 +
core/sql/sqlcomp/nadefaults.cpp | 1 +
core/sql/sqludr/SqlUdrPredefLogReader.cpp | 983 +++++++++++++------------
dcs/bin/dcs-daemon.sh | 6 +-
18 files changed, 616 insertions(+), 529 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sqf/sql/scripts/genms
----------------------------------------------------------------------
diff --git a/core/sqf/sql/scripts/genms b/core/sqf/sql/scripts/genms
index ea50d73..13839db 100755
--- a/core/sqf/sql/scripts/genms
+++ b/core/sqf/sql/scripts/genms
@@ -198,6 +198,9 @@ if [ ! -d $cacertsdir ]; then
fi
echo "CACERTS_DIR=$cacertsdir"
+dcsinstalldir=$DCS_INSTALL_DIR
+echo "DCS_INSTALL_DIR=$dcsinstalldir"
+
echo ""
echo ""
echo "# Added by gensq.pl"
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/comexe/ComTdbHbaseAccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHbaseAccess.cpp b/core/sql/comexe/ComTdbHbaseAccess.cpp
index a85751f..f25db03 100644
--- a/core/sql/comexe/ComTdbHbaseAccess.cpp
+++ b/core/sql/comexe/ComTdbHbaseAccess.cpp
@@ -191,7 +191,7 @@ ComTdbHbaseAccess::ComTdbHbaseAccess(
samplingRate_(samplingRate),
sampleLocation_(NULL),
hbaseRowsetVsbbSize_(0),
-
+ trafLoadFlushSize_(0),
hbaseAccessOptions_(hbaseAccessOptions)
{};
@@ -298,7 +298,7 @@ ComTdbHbaseAccess::ComTdbHbaseAccess(
samplingRate_(-1),
sampleLocation_(NULL),
hbaseRowsetVsbbSize_(0),
-
+ trafLoadFlushSize_(0),
hbaseAccessOptions_(NULL)
{
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/comexe/ComTdbHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/comexe/ComTdbHbaseAccess.h b/core/sql/comexe/ComTdbHbaseAccess.h
index bf5bb42..1066c11 100644
--- a/core/sql/comexe/ComTdbHbaseAccess.h
+++ b/core/sql/comexe/ComTdbHbaseAccess.h
@@ -832,6 +832,11 @@ public:
UInt16 getHbaseRowsetVsbbSize()
{ return hbaseRowsetVsbbSize_; }
+ void setTrafLoadFlushSize(UInt16 size)
+ { trafLoadFlushSize_ = size; }
+ UInt16 getTrafLoadFlushSize()
+ { return trafLoadFlushSize_; }
+
void setLogErrorRows(NABoolean v)
{(v ? flags2_ |= TRAF_LOAD_LOG_ERROR_ROWS : flags2_ &= ~TRAF_LOAD_LOG_ERROR_ROWS); };
NABoolean getLogErrorRows() { return (flags2_ & TRAF_LOAD_LOG_ERROR_ROWS) != 0; };
@@ -982,6 +987,7 @@ public:
HbaseSnapshotScanAttributesPtr hbaseSnapshotScanAttributes_;
UInt32 maxErrorRows_;
UInt16 hbaseRowsetVsbbSize_;
+ UInt16 trafLoadFlushSize_;
HbaseAccessOptionsPtr hbaseAccessOptions_;
char fillers[2];
};
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/executor/ExHbaseAccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp
index 8e98daa..4bcbfdd 100644
--- a/core/sql/executor/ExHbaseAccess.cpp
+++ b/core/sql/executor/ExHbaseAccess.cpp
@@ -2315,7 +2315,7 @@ void ExHbaseAccessTcb::allocateDirectBufferForJNI(UInt32 rowLen)
void ExHbaseAccessTcb::allocateDirectRowBufferForJNI(
- short numCols, short maxRows)
+ short numCols, UInt16 maxRows)
{
UInt32 directBufferOverhead;
UInt32 maxRowLen;
@@ -2371,7 +2371,7 @@ short ExHbaseAccessTcb::patchDirectRowIDBuffers()
return numRowsInBuffer;
}
-void ExHbaseAccessTcb::allocateDirectRowIDBufferForJNI(short maxRows)
+void ExHbaseAccessTcb::allocateDirectRowIDBufferForJNI(UInt16 maxRows)
{
UInt32 rowIDLen;
UInt32 maxRowIDLen;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/executor/ExHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.h b/core/sql/executor/ExHbaseAccess.h
index 9271227..b7cf604 100644
--- a/core/sql/executor/ExHbaseAccess.h
+++ b/core/sql/executor/ExHbaseAccess.h
@@ -342,10 +342,10 @@ protected:
void setRowID(char *rowId, Lng32 rowIdLen);
void allocateDirectBufferForJNI(UInt32 rowLen);
void allocateDirectRowBufferForJNI(short numCols,
- short maxRows = 1);
+ UInt16 maxRows = 1);
short patchDirectRowBuffers();
short patchDirectRowIDBuffers();
- void allocateDirectRowIDBufferForJNI(short maxRows = 1);
+ void allocateDirectRowIDBufferForJNI(UInt16 maxRows = 1);
Lng32 copyColToDirectBuffer( BYTE *rowCurPtr,
char *colName, short colNameLen,
NABoolean prependNullVal, char nullVal,
@@ -464,7 +464,7 @@ protected:
//
BYTE *directRowBuffer_;
Lng32 directRowBufferLen_;
- short directBufferMaxRows_;
+ UInt16 directBufferMaxRows_;
// Structure to keep track of current row
HbaseStr row_;
// Structure to keep track of current position in direct row buffer
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/executor/ExHbaseIUD.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseIUD.cpp b/core/sql/executor/ExHbaseIUD.cpp
index 5e810a4..5c4df55 100644
--- a/core/sql/executor/ExHbaseIUD.cpp
+++ b/core/sql/executor/ExHbaseIUD.cpp
@@ -1385,8 +1385,8 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work()
}
allocateDirectRowBufferForJNI(
numCols,
- hbaseAccessTdb().getHbaseRowsetVsbbSize());
- allocateDirectRowIDBufferForJNI(hbaseAccessTdb().getHbaseRowsetVsbbSize());
+ hbaseAccessTdb().getTrafLoadFlushSize());
+ allocateDirectRowIDBufferForJNI(hbaseAccessTdb().getTrafLoadFlushSize());
step_ = SETUP_INSERT;
}
break;
@@ -1514,7 +1514,7 @@ ExWorkProcRetcode ExHbaseAccessBulkLoadPrepSQTcb::work()
break ;
}
- if (currRowNum_ < hbaseAccessTdb().getHbaseRowsetVsbbSize())
+ if (currRowNum_ < hbaseAccessTdb().getTrafLoadFlushSize())
{
step_ = DONE;
break;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/generator/GenExplain.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenExplain.cpp b/core/sql/generator/GenExplain.cpp
index a85f843..28e6c45 100644
--- a/core/sql/generator/GenExplain.cpp
+++ b/core/sql/generator/GenExplain.cpp
@@ -1731,6 +1731,14 @@ GenericUpdate::addSpecificExplainInfo(ExplainTupleMaster *explainTuple,
description += " ";
}
+ if (natable->isSeabaseTable() &&
+ (((ComTdbHbaseAccess *)tdb)->getTrafLoadFlushSize() > 0)) {
+ char lbuf[20];
+ description += "load_flush_size: " ;
+ sprintf(lbuf, "%d ", ((ComTdbHbaseAccess *)tdb)->getTrafLoadFlushSize());
+ description += lbuf;
+ }
+
return 0;
}
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/generator/GenRelUpdate.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelUpdate.cpp b/core/sql/generator/GenRelUpdate.cpp
index 7989de3..07944d3 100644
--- a/core/sql/generator/GenRelUpdate.cpp
+++ b/core/sql/generator/GenRelUpdate.cpp
@@ -2835,6 +2835,20 @@ short HbaseInsert::codeGen(Generator *generator)
hbasescan_tdb->setNoDuplicates(CmpCommon::getDefault(TRAF_LOAD_PREP_SKIP_DUPLICATES) == DF_OFF);
hbasescan_tdb->setMaxHFileSize(CmpCommon::getDefaultLong(TRAF_LOAD_MAX_HFILE_SIZE));
+ ULng32 loadFlushSize = getDefault(TRAF_LOAD_FLUSH_SIZE_IN_ROWS);
+ if (loadFlushSize == 0)
+ {// user has not specified a size, assume 1MB buffer is optimal
+
+ loadFlushSize = (1024*1024)/hbasescan_tdb->getRowLen() ;
+ if (loadFlushSize > getMaxCardEst().value()) {
+ // for small tables go back to previous default
+ loadFlushSize = getDefault(HBASE_ROWSET_VSBB_SIZE);
+ }
+ }
+ if (loadFlushSize > USHRT_MAX) // largest flush size, runtime cannot
+ loadFlushSize = USHRT_MAX; // handle higher values without code change
+ hbasescan_tdb->setTrafLoadFlushSize(loadFlushSize);
+
// For sample file, set the sample location in HDFS and the sampling rate.
// Move later, when sampling not limited to bulk loads.
if (getCreateUstatSample())
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/generator/GenUdr.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenUdr.cpp b/core/sql/generator/GenUdr.cpp
index 64f042e..a19aa47 100644
--- a/core/sql/generator/GenUdr.cpp
+++ b/core/sql/generator/GenUdr.cpp
@@ -361,7 +361,10 @@ ExplainTuple *PhysicalTableMappingUDF::addSpecificExplainInfo(ExplainTupleMaster
description += getNARoutine()->getLibrarySqlName().getExternalName();
description += " external_file: ";
- description += getNARoutine()->getExternalPath();
+ if (getNARoutine()->getLanguage() == COM_LANGUAGE_JAVA)
+ description += getNARoutine()->getExternalPath();
+ else
+ description += getNARoutine()->getFile(); // CPP
description += " ";
explainTuple->setDescription(description);
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/optimizer/NAColumn.cpp
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/NAColumn.cpp b/core/sql/optimizer/NAColumn.cpp
index 8814db8..1f90ed3 100644
--- a/core/sql/optimizer/NAColumn.cpp
+++ b/core/sql/optimizer/NAColumn.cpp
@@ -750,13 +750,13 @@ Lng32 NAColumnArray::getOffset(Lng32 position) const
return result;
}
-Lng32 NAColumnArray::getMaxTrafHbaseColQualifier() const
+ULng32 NAColumnArray::getMaxTrafHbaseColQualifier() const
{
NAColumn *column;
char * colQualPtr;
Lng32 colQualLen;
Int64 colQVal;
- Lng32 maxVal = 0;
+ ULng32 maxVal = 0;
for (CollIndex i = 0; i < entries(); i++)
{
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/optimizer/NAColumn.h
----------------------------------------------------------------------
diff --git a/core/sql/optimizer/NAColumn.h b/core/sql/optimizer/NAColumn.h
index 46ab004..1573cb9 100644
--- a/core/sql/optimizer/NAColumn.h
+++ b/core/sql/optimizer/NAColumn.h
@@ -631,7 +631,7 @@ public:
// numeric > 0. This method is used during alter table add
// column to find the maximum value currently in use. Columns
// are deleted during alter table drop column.
- Lng32 getMaxTrafHbaseColQualifier() const;
+ ULng32 getMaxTrafHbaseColQualifier() const;
private:
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/regress/hive/EXPECTED020
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/EXPECTED020 b/core/sql/regress/hive/EXPECTED020
index bd1cdfe..8f07fda 100644
--- a/core/sql/regress/hive/EXPECTED020
+++ b/core/sql/regress/hive/EXPECTED020
@@ -14,13 +14,28 @@
--- SQL operation complete.
>>
+>>prepare explainIt from
++> select substring(cast(SEQ_NUM+100 as char(3)),2,2) s,
++> substring(operator,1,16) operator,
++> cast(LEFT_CHILD_SEQ_NUM as char(2)) lc,
++> cast(RIGHT_CHILD_SEQ_NUM as char(2)) rc,
++> substring
++> (substring(tname from (1+locate('.',tname))),
++> (locate('.',substring(tname from (1+locate('.',tname))))),
++> 10
++> ) tab_name
++> from table (explain(NULL,'XX'))
++> order by 1 desc;
+
+--- SQL command prepared.
+>>
>>obey TEST020(tests);
>>--------------------------------------------------------------------------
>>-- ORC file metadata info
>>invoke hive.hive.store_orc;
-- Definition of hive table STORE_ORC
--- Definition current Fri Sep 18 21:54:29 2015
+-- Definition current Wed Oct 21 00:08:18 2015
(
S_STORE_SK INT
@@ -132,24 +147,25 @@ S_STORE_SK (EXPR)
--- 1 row(s) selected.
>>
>>-- explain of join between 2 ORC tables
->>explain options 'f' select x.s_suite_number, y.s_street_name
+>>prepare XX from select x.s_suite_number, y.s_street_name
+> from hive.hive.store_orc x, hive.hive.store_orc y
+> where x.s_store_sk = y.s_store_sk;
-LC RC OP OPERATOR OPT DESCRIPTION CARD
----- ---- ---- -------------------- -------- -------------------- ---------
+--- SQL command prepared.
+>>execute explainIt;
-3 . 4 root 8.17E+004
-2 1 3 hybrid_hash_join 8.17E+004
-. . 2 hive_scan STORE_ORC 4.08E+003
-. . 1 hive_scan STORE_ORC 4.08E+003
+S OPERATOR LC RC TAB_NAME
+-- ---------------- -- -- ----------
---- SQL operation complete.
+04 ROOT 3 ?
+03 HYBRID_HASH_JOIN 2 1
+02 HIVE_SCAN ? ? .STORE_ORC
+01 HIVE_SCAN ? ? .STORE_ORC
+
+--- 4 row(s) selected.
>>
>>-- execute of join between 2 ORC tables
->>select x.s_suite_number, y.s_street_name
-+> from hive.hive.store_orc x, hive.hive.store_orc y
-+> where x.s_store_sk = y.s_store_sk;
+>>execute XX;
S_SUITE_NUMBER S_STREET_NAME
-------------------- --------------------
@@ -170,24 +186,25 @@ Suite 100 College
--- 12 row(s) selected.
>>
>>-- explain of join between hive(hdfs) and ORC tables
->>explain options 'f' select x.s_suite_number, y.s_street_name
+>>prepare XX from select x.s_suite_number, y.s_street_name
+> from hive.hive.store x, hive.hive.store_orc y
+> where x.s_store_sk = y.s_store_sk;
-LC RC OP OPERATOR OPT DESCRIPTION CARD
----- ---- ---- -------------------- -------- -------------------- ---------
+--- SQL command prepared.
+>>execute explainIt;
-3 . 4 root 4.08E+003
-1 2 3 nested_join 4.08E+003
-. . 2 hive_scan STORE_ORC 4.08E+001
-. . 1 hive_scan STORE 1.00E+002
+S OPERATOR LC RC TAB_NAME
+-- ---------------- -- -- ----------
---- SQL operation complete.
+04 ROOT 3 ?
+03 NESTED_JOIN 1 2
+02 HIVE_SCAN ? ? .STORE_ORC
+01 HIVE_SCAN ? ? .STORE)
+
+--- 4 row(s) selected.
>>
>>-- execute of join between hive(hdfs) and ORC tables
->>select x.s_suite_number, y.s_street_name
-+> from hive.hive.store x, hive.hive.store_orc y
-+> where x.s_store_sk = y.s_store_sk;
+>>execute XX;
S_SUITE_NUMBER S_STREET_NAME
-------------------- --------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/regress/hive/TEST020
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/TEST020 b/core/sql/regress/hive/TEST020
index a35e28a..ded1d98 100644
--- a/core/sql/regress/hive/TEST020
+++ b/core/sql/regress/hive/TEST020
@@ -36,6 +36,19 @@ cqd HIVE_MAX_STRING_LENGTH '20' ;
cqd mode_seahive 'ON';
cqd traf_enable_orc_format 'ON';
+prepare explainIt from
+ select substring(cast(SEQ_NUM+100 as char(3)),2,2) s,
+ substring(operator,1,16) operator,
+ cast(LEFT_CHILD_SEQ_NUM as char(2)) lc,
+ cast(RIGHT_CHILD_SEQ_NUM as char(2)) rc,
+ substring
+ (substring(tname from (1+locate('.',tname))),
+ (locate('.',substring(tname from (1+locate('.',tname))))),
+ 10
+ ) tab_name
+ from table (explain(NULL,'XX'))
+ order by 1 desc;
+
?section tests
--------------------------------------------------------------------------
-- ORC file metadata info
@@ -54,22 +67,20 @@ select s_store_sk, left(s_store_id, 20) from hive.hive.store_orc where s_store_s
select count(*) from hive.hive.store_orc;
-- explain of join between 2 ORC tables
-explain options 'f' select x.s_suite_number, y.s_street_name
+prepare XX from select x.s_suite_number, y.s_street_name
from hive.hive.store_orc x, hive.hive.store_orc y
where x.s_store_sk = y.s_store_sk;
+execute explainIt;
-- execute of join between 2 ORC tables
-select x.s_suite_number, y.s_street_name
- from hive.hive.store_orc x, hive.hive.store_orc y
- where x.s_store_sk = y.s_store_sk;
+execute XX;
-- explain of join between hive(hdfs) and ORC tables
-explain options 'f' select x.s_suite_number, y.s_street_name
+prepare XX from select x.s_suite_number, y.s_street_name
from hive.hive.store x, hive.hive.store_orc y
where x.s_store_sk = y.s_store_sk;
+execute explainIt;
-- execute of join between hive(hdfs) and ORC tables
-select x.s_suite_number, y.s_street_name
- from hive.hive.store x, hive.hive.store_orc y
- where x.s_store_sk = y.s_store_sk;
+execute XX;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/sqlcomp/CmpSeabaseDDLtable.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/CmpSeabaseDDLtable.cpp b/core/sql/sqlcomp/CmpSeabaseDDLtable.cpp
index 6fe3fbc..05517f7 100644
--- a/core/sql/sqlcomp/CmpSeabaseDDLtable.cpp
+++ b/core/sql/sqlcomp/CmpSeabaseDDLtable.cpp
@@ -4596,7 +4596,7 @@ void CmpSeabaseDDL::alterSeabaseTableAddColumn(
char * col_name = new(STMTHEAP) char[colName.length() + 1];
strcpy(col_name, (char*)colName.data());
- Lng32 maxColQual = nacolArr.getMaxTrafHbaseColQualifier();
+ ULng32 maxColQual = nacolArr.getMaxTrafHbaseColQualifier();
NAString quotedHeading;
if (NOT heading.isNull())
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h
index c702b8f..cbe4fcc 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3766,6 +3766,7 @@ enum DefaultConstants
// costing code has broader exposure.
HBASE_DELETE_COSTING,
HBASE_UPDATE_COSTING,
+ TRAF_LOAD_FLUSH_SIZE_IN_ROWS,
// This enum constant must be the LAST one in the list; it's a count,
// not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index 9e23cfe..e3ad370 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -3317,6 +3317,7 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"),
DD_____(TRAF_LOAD_ERROR_COUNT_ID, "" ),
DD_____(TRAF_LOAD_ERROR_COUNT_TABLE, "ERRORCOUNTER" ),
DD_____(TRAF_LOAD_ERROR_LOGGING_LOCATION, "/bulkload/logs/" ),
+ DDint__(TRAF_LOAD_FLUSH_SIZE_IN_ROWS, "0"), // in # rows
DDkwd__(TRAF_LOAD_FORCE_CIF, "ON"),
DDkwd__(TRAF_LOAD_LOG_ERROR_ROWS, "OFF"),
DDint__(TRAF_LOAD_MAX_ERROR_ROWS, "0"),
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/core/sql/sqludr/SqlUdrPredefLogReader.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqludr/SqlUdrPredefLogReader.cpp b/core/sql/sqludr/SqlUdrPredefLogReader.cpp
index 6a36a3c..b9e54f7 100644
--- a/core/sql/sqludr/SqlUdrPredefLogReader.cpp
+++ b/core/sql/sqludr/SqlUdrPredefLogReader.cpp
@@ -224,7 +224,7 @@ bool validateCharsAndCopy(char *outBuf, int outBufLen,
//
// log_ts timestamp(6),
// severity char(10 bytes) character set utf8,
-// component char(24 bytes) character set utf8,
+// component varchar(50 bytes) character set utf8,
// node_number integer,
// cpu integer,
// pin integer,
@@ -379,7 +379,7 @@ void ReadCppEventsUDFInterface::describeParamsAndColumns(
true,
6)));
outTable.addCharColumn ("SEVERITY", 10, true);
- outTable.addCharColumn ("COMPONENT", 24, true);
+ outTable.addVarCharColumn("COMPONENT", 50, true);
outTable.addIntColumn ("NODE_NUMBER", true);
outTable.addIntColumn ("CPU", true);
outTable.addIntColumn ("PIN", true);
@@ -474,533 +474,556 @@ void ReadCppEventsUDFInterface::processData(UDRInvocationInfo &info,
char *ok = NULL; // status of fgets
int haveRowToEmit = 0;
int appendPos = 0;
+ int numLogLocations = 2 ;
- char* sqroot = getenv("MY_SQROOT");
- if (strlen(sqroot) > 1000)
- throw UDRException(38001, "SQROOT is longer than 1000 characters");
-
- std::string logDirName(sqroot);
- std::string confFileName(sqroot);
- std::string logFileName;
- std::string eventLogFileName(sqroot);
-
- logDirName += "/logs";
- confFileName += "/conf/log4cxx.trafodion.config";
-
- if (doTrace)
+ for(int logLocationIndex = 0; logLocationIndex < numLogLocations; logLocationIndex++)
+ {
+ char* logrootdir = NULL;
+ switch (logLocationIndex)
+ {
+ case 0: // sqroot, for all logs other than dcs
+ logrootdir = getenv("MY_SQROOT");
+ if (strlen(logrootdir) > 1000)
+ throw UDRException(38001, "SQROOT is longer than 1000 characters");
+ break ;
+ case 1:
+ logrootdir = getenv("DCS_INSTALL_DIR");
+ if (!logrootdir)
+ throw UDRException(38001, "DCS_INSTALL_DIR not set");
+ else if (strlen(logrootdir) > 1000)
+ throw UDRException(38001, "DCS_INSTALL_DIR is longer than 1000 characters");
+ break ;
+ default:
+ throw UDRException(38001, "Internal error in determining logroot directory");
+ }
+
+ std::string logDirName(logrootdir);
+ std::string confFileName(logrootdir);
+ std::string logFileName;
+ std::string eventLogFileName(logrootdir);
+
+ logDirName += "/logs";
+ confFileName += "/conf/log4cxx.trafodion.config"; // conf file logic not coded for dcs yet
+
+ if (doTrace)
{
printf("(%d) EVENT_LOG_READER open log dir %s\n", pid, logDirName.data());
fflush(stdout);
}
-
- errno = 0;
- if (logDir_ != NULL)
+
+ errno = 0;
+ if (logDir_ != NULL)
{
// this may happen if we exited a previous call with an exception
closedir(logDir_);
logDir_ = NULL;
}
-
- logDir_ = opendir(logDirName.data());
- if (logDir_ == NULL)
- throw UDRException(
- 38002,
- "Error %d on opening directory %s",
- (int) errno, logDirName.data());
-
- cFile = fopen(confFileName.data(), "r");
- if (cFile)
+
+ logDir_ = opendir(logDirName.data());
+ if (logDir_ == NULL)
+ throw UDRException(
+ 38002,
+ "Error %d on opening directory %s",
+ (int) errno, logDirName.data());
+
+ cFile = fopen(confFileName.data(), "r");
+ if (cFile)
{
char * name ;
if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER Conf file fopen\n", pid);
- fflush(stdout);
- }
+ {
+ printf("(%d) EVENT_LOG_READER Conf file fopen\n", pid);
+ fflush(stdout);
+ }
while ((ok = fgets(cFileInputLine, sizeof(cFileInputLine),
cFile)) != NULL)
- {
- if (name = strstr(cFileInputLine,
- "log4j.appender.mxoAppender.fileName="))
- {
- name = name + strlen("log4j.appender.mxoAppender.fileName=");
- if (strstr(name, "${trafodion.log.dir}/"))
- {
- name = name + strlen("${trafodion.log.dir}/");
- name[strlen(name) - 1] = '_' ;
- eventLogFileName.assign(name);
- }
- else
- {
- // log file directory different from expected.
- // currently not supported.
- }
- break ;
- }
- else
- memset((char *)cFileInputLine, 0, sizeof(cFileInputLine));
+ {
+ if (name = strstr(cFileInputLine,
+ "log4j.appender.mxoAppender.fileName="))
+ {
+ name = name + strlen("log4j.appender.mxoAppender.fileName=");
+ if (strstr(name, "${trafodion.log.dir}/"))
+ {
+ name = name + strlen("${trafodion.log.dir}/");
+ name[strlen(name) - 1] = '_' ;
+ eventLogFileName.assign(name);
+ }
+ else
+ {
+ // log file directory different from expected.
+ // currently not supported.
+ }
+ break ;
+ }
+ else
+ memset((char *)cFileInputLine, 0, sizeof(cFileInputLine));
}
fclose(cFile);
cFile = NULL;
}
- if (addFileColumns)
- // log_file_node is the same for every row generated by this process
- info.out().setInt(LOG_FILE_NODE_COLNUM, info.getMyInstanceNum());
-
- // ---------------------------------------------------------------------------
- // Loop over the files in the log directory
- // ---------------------------------------------------------------------------
- while (1)
- {
- // read the next file in the log directory
-
- errno = 0;
- struct dirent *dirEntry = readdir(logDir_);
-
- if (errno != 0)
- throw UDRException(
- 38003,
- "Error %d on reading from directory %s",
- (int) errno, logDirName.data());
-
- // last file seen, we are done
- if (dirEntry == NULL)
- break;
+ if (addFileColumns)
+ // log_file_node is the same for every row generated by this process
+ info.out().setInt(LOG_FILE_NODE_COLNUM, info.getMyInstanceNum());
- if (doTrace)
+ // ---------------------------------------------------------------------------
+ // Loop over the files in the log directory
+ // ---------------------------------------------------------------------------
+ while (1)
+ {
+ // read the next file in the log directory
+
+ errno = 0;
+ struct dirent *dirEntry = readdir(logDir_);
+
+ if (errno != 0)
+ throw UDRException(
+ 38003,
+ "Error %d on reading from directory %s",
+ (int) errno, logDirName.data());
+
+ // last file seen, we are done
+ if (dirEntry == NULL)
+ break;
+
+ if (doTrace)
{
- printf("(%d) EVENT_LOG_READER examining log file %s\n", pid, dirEntry->d_name);
- fflush(stdout);
+ printf("(%d) EVENT_LOG_READER examining log file %s\n", pid, dirEntry->d_name);
+ fflush(stdout);
}
- const char *fileName = dirEntry->d_name;
- size_t nameLen = strlen(fileName);
- const char *suffix = NULL;
- const char *expectedSuffix = ".log";
- size_t expectedSuffixLen = strlen(expectedSuffix);
-
- if (nameLen > expectedSuffixLen)
- suffix = &fileName[nameLen-expectedSuffixLen];
-
- // parse the file name to see whether this is a file we want to look at,
- // allow some fixed string values as well as any name configured in
- // the config file
- if (suffix && strcmp(suffix, expectedSuffix) == 0 &&
- (strstr(fileName, "mxosrvr_") == fileName ||
- strstr(fileName, "tmf") == fileName ||
- strstr(fileName, "master_exec_") == fileName ||
- strstr(fileName, eventLogFileName.data()) == fileName ||
- strstr(fileName, "tm.log") == fileName ||
- strstr(fileName, "mxlobsrvr") == fileName ||
- strstr(fileName, "sscp") == fileName ||
- strstr(fileName, "ssmp") == fileName ||
- strstr(fileName, "mon") == fileName ||
- strstr(fileName, "pstartd") == fileName ||
- strstr(fileName, "wdg") == fileName ||
- strstr(fileName, "udr") == fileName)
- )
+ const char *fileName = dirEntry->d_name;
+ size_t nameLen = strlen(fileName);
+ const char *suffix = NULL;
+ const char *expectedSuffixPart = ".log";
+ size_t expectedSuffixLenMax = strlen(expectedSuffixPart);
+ if (logLocationIndex == 0)
+ expectedSuffixLenMax += 2 ; // for "*log.1" rollover files
+ else if (logLocationIndex == 1)
+ expectedSuffixLenMax += 11 ; // for "*.log.yyyy-mm-dd" rollover files
+
+
+ if (nameLen > expectedSuffixLenMax)
+ suffix = &fileName[nameLen-expectedSuffixLenMax];
+
+ // parse the file name to see whether this is a file we want to look at,
+ // allow some fixed string values as well as any name configured in
+ // the config file
+ if (suffix && strstr(suffix, expectedSuffixPart) != NULL &&
+ (strstr(fileName, "master_exec_") == fileName ||
+ strstr(fileName, eventLogFileName.data()) == fileName ||
+ strstr(fileName, "tm_") == fileName ||
+ strstr(fileName, "mxlobsrvr_") == fileName ||
+ strstr(fileName, "sscp") == fileName ||
+ strstr(fileName, "ssmp") == fileName ||
+ strstr(fileName, "mon") == fileName ||
+ strstr(fileName, "pstartd") == fileName ||
+ strstr(fileName, "wdg") == fileName ||
+ strstr(fileName, "udr_") == fileName ||
+ strstr(fileName, "dcs-") == fileName
+ ))
{
if (infile_ != NULL)
- {
- fclose(infile_);
- infile_ = NULL;
- }
-
+ {
+ fclose(infile_);
+ infile_ = NULL;
+ }
+
logFileName = logDirName + "/" + fileName;
-
+
// Open the input file
infile_ = fopen(logFileName.data(), "r");
if (infile_ == NULL)
throw UDRException(
- 38001,
- "Error %d returned when opening log file %s",
- status, fileName);
-
+ 38001,
+ "Error %d returned when opening log file %s",
+ status, fileName);
+
if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER fopen\n", pid);
- fflush(stdout);
- }
+ {
+ printf("(%d) EVENT_LOG_READER fopen\n", pid);
+ fflush(stdout);
+ }
if (addFileColumns)
info.out().setString(LOG_FILE_NAME_COLNUM, fileName);
-
+
lineNumber = 0;
std::string messageTextField;
std::string rowParseStatus;
-
+
// ---------------------------------------------------------------------
// Loop over the lines of the file
// ---------------------------------------------------------------------
while ((ok = fgets(inputLine, sizeof(inputLine), infile_)) != NULL)
- {
- int year, month, day, hour, minute, second, fraction;
- char fractionSeparator[2];
- char *currField = inputLineValidated;
- char *nextField = NULL;
- int numChars = 0;
- int numItems = 0;
- int intFieldVal;
- int lineLength = strlen(inputLine);
- int lineLengthValidated = 0;
- std::string lineParseError;
-
- lineNumber++;
-
- // skip any empty lines, should not really happen
- if (lineLength < 2)
- {
- if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER read short line %s\n", pid, inputLine);
- fflush(stdout);
- }
-
- continue;
- }
-
- // remove a trailing LF character
- if (inputLine[lineLength-1] == '\n')
- {
- lineLength--;
- inputLine[lineLength] = 0;
- }
- else
- {
- // skip over any text in the same line that
- // didn't get read
- char extraChars[4000];
- char *extraStatus;
- do
- {
- extraStatus = fgets(extraChars, sizeof(extraChars), infile_);
- }
- while (extraStatus != NULL && extraChars[strlen(extraChars)-1] != '\n');
-
- setParseError(TruncationError, lineParseError);
- }
-
- if (! validateCharsAndCopy(inputLineValidated,
- sizeof(inputLineValidated),
- inputLine,
- lineLength,
- lineLengthValidated))
- {
- if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER invalid UTF8 char line %d\n",
- pid, lineNumber);
- fflush(stdout);
- }
- setParseError(CharConversionError, lineParseError);
- }
-
-
- // try to read the timestamp at the beginning of the line. Example:
- // 2014-10-30 20:49:53,252
- numItems = sscanf(currField,
- "%4d-%2d-%2d %2d:%2d:%2d %1[,.] %6d%n",
- &year, &month, &day, &hour, &minute,
- &second, fractionSeparator, &fraction, &numChars);
-
- if (numItems == 8)
- {
- // We were able to read a timestamp field
-
- // Emit previous row, we have seen the start of next row
- if (haveRowToEmit)
- {
- // set final two columns, message text and parse error
- setCharOutputColumn(info,
- MESSAGE_COLNUM,
- messageTextField.data(),
- rowParseStatus);
- if (addFileColumns)
- setCharOutputColumn(info,
- PARSE_STATUS_COLNUM,
- rowParseStatus.c_str(),
- rowParseStatus);
- emitRow(info);
- if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER emit\n", pid);
- fflush(stdout);
- }
- }
-
- // we read a line that will produce an output row, initialize
- // some fields for this output row
- haveRowToEmit = 0;
- appendPos = 0;
- messageTextField.erase();
- rowParseStatus = lineParseError;
-
- // When we see a comma between time and fraction, we interpret
- // that as a fraction that is specified in milliseconds. Convert
- // to microseconds. When it's specified with a dot, we interpret
- // the fraction as microseconds (SQL syntax).
- if (*fractionSeparator == ',')
- fraction *= 1000;
-
- char buf[100];
- snprintf(buf, sizeof(buf),
- "%04d-%02d-%02d %02d:%02d:%02d.%06d",
- year, month, day, hour, minute, second, fraction);
- setCharOutputColumn(info, LOG_TS_COLNUM, buf, rowParseStatus);
- }
- else
- {
- if (!haveRowToEmit)
- {
- // no valid timestamp and We did not have a previous line
- // with a timestamp
- if (numItems > 6)
- numItems = 6;
-
- if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER Read only %d of 7 timestamp fields: %s\n",
- pid, numItems, currField);
- fflush(stdout);
- }
-
- // return a NULL value if we fail to parse the timestamp
- info.out().setNull(LOG_TS_COLNUM);
- rowParseStatus = lineParseError;
- setParseError(FieldParserError, rowParseStatus);
- }
- else
- {
- // no valid timestamp and we have a row to emit
- // consider this line as a continuation of previous row
- // (add a blank instead of a line feed, though)
- messageTextField += " ";
- messageTextField += currField;
-
- // add any parse errors from this line
- for (std::string::iterator it = lineParseError.begin();
- it != lineParseError.end();
- it++)
- setParseError(*it, rowParseStatus);
- }
- }
-
- if (!haveRowToEmit)
- {
- // skip over the information already read
- currField = currField + numChars;
-
- // skip over the comma
- currField = strstr(currField, ",");
- if (currField)
- currField++;
- else
- {
- // did not find a comma delimiter, this is a parse
- // error, produce NULL values for remaining columns
- // except the message
- if (numChars > 0)
- currField = inputLineValidated + (numChars-1);
- else
- currField = inputLineValidated;
- *currField = 0;
- messageTextField =
- (numChars < lineLengthValidated) ? currField+1 : currField;
- setParseError(FieldParserError, rowParseStatus);
- }
- }
-
- // read columns 2: SEVERITY - 9: QUERY_ID
- for (columnNum = 2; (columnNum <= 9 && !haveRowToEmit); columnNum++)
- {
- // find the next comma, the end of our field value
- char *endOfField = strstr(currField, ",");
- char *startOfVal = NULL;
-
- if (endOfField != NULL)
- {
- startOfVal = (endOfField != currField ? endOfField-1 : currField);
-
- // next field starts after the comma
- nextField = endOfField + 1;
-
- // back up before the trailing comma, if the value is not empty
- if (endOfField != currField)
- endOfField--;
-
- // remove trailing blanks
- while (*endOfField == ' ')
- endOfField--;
-
- // place a nul-terminator at the end of the field
- // (this overwrites the comma or a trailing blank)
- if (endOfField != currField)
- endOfField[1] = 0;
- else
- endOfField[0] = 0; // empty field
-
- // from the end, go back to the preceding ":" or ","
- // or until we reach the start of the current field
- // This way, we skip field names like "CPU:" in CPU: 3
- while (*startOfVal != ':' &&
- *startOfVal != ',' &&
- startOfVal != currField)
- startOfVal--;
-
- // skip the ":"
- if (startOfVal != currField)
- startOfVal++;
-
- // skip leading blanks
- while (*startOfVal == ' ')
- startOfVal++;
- } // found a comma delimiter
- else
- {
- // Did not find a comma delimiter. This could be a
- // parse error or a missing optional column,
- // produce NULL values for remaining columns
- // except the message
- if (currField != inputLineValidated)
- // back up, since currField is now pointing
- // at the first character of the message text
- currField--;
- *currField = 0;
- startOfVal = currField;
- nextField = currField;
- // if there is any text left, point to it
- // (after currField, which points to a NUL byte)
- // otherwise set the message text field to an empty string
- if (messageTextField.empty())
- messageTextField =
- (currField-inputLineValidated < lineLengthValidated) ?
- currField+1 : currField;
- setParseError(FieldParserError, rowParseStatus);
- }
-
- // now that we have the non-blank portion of the value,
- // copy it into the output column
- switch (columnNum)
- {
- case 2:
- setCharOutputColumn(info,
- SEVERITY_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 3:
- setCharOutputColumn(info,
- COMPONENT_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 4:
- setIntOutputColumn(info,
- NODE_NUMBER_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 5:
- setIntOutputColumn(info,
- CPU_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 6:
- setIntOutputColumn(info,
- PIN_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 7:
- setCharOutputColumn(info,
- PROCESS_NAME_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 8:
- setIntOutputColumn(info,
- SQL_CODE_COLNUM,
- startOfVal,
- rowParseStatus);
- break;
-
- case 9:
- setCharOutputColumn(info,
- QUERY_ID_COLNUM,
- startOfVal,
- rowParseStatus);
- // we read all required fields,
- // next field is the message text
- if (messageTextField.empty())
- messageTextField = nextField;
- break;
- }
-
- currField = nextField;
- } // loop over column numbers 2-9
-
+ {
+ int year, month, day, hour, minute, second, fraction;
+ char fractionSeparator[2];
+ char *currField = inputLineValidated;
+ char *nextField = NULL;
+ int numChars = 0;
+ int numItems = 0;
+ int intFieldVal;
+ int lineLength = strlen(inputLine);
+ int lineLengthValidated = 0;
+ std::string lineParseError;
+
+ lineNumber++;
+
+ // skip any empty lines, should not really happen
+ if (lineLength < 2)
+ {
+ if (doTrace)
+ {
+ printf("(%d) EVENT_LOG_READER read short line %s\n", pid, inputLine);
+ fflush(stdout);
+ }
+
+ continue;
+ }
+
+ // remove a trailing LF character
+ if (inputLine[lineLength-1] == '\n')
+ {
+ lineLength--;
+ inputLine[lineLength] = 0;
+ }
+ else
+ {
+ // skip over any text in the same line that
+ // didn't get read
+ char extraChars[4000];
+ char *extraStatus;
+ do
+ {
+ extraStatus = fgets(extraChars, sizeof(extraChars), infile_);
+ }
+ while (extraStatus != NULL && extraChars[strlen(extraChars)-1] != '\n');
+
+ setParseError(TruncationError, lineParseError);
+ }
+
+ if (! validateCharsAndCopy(inputLineValidated,
+ sizeof(inputLineValidated),
+ inputLine,
+ lineLength,
+ lineLengthValidated))
+ {
+ if (doTrace)
+ {
+ printf("(%d) EVENT_LOG_READER invalid UTF8 char line %d\n",
+ pid, lineNumber);
+ fflush(stdout);
+ }
+ setParseError(CharConversionError, lineParseError);
+ }
+
+
+ // try to read the timestamp at the beginning of the line. Example:
+ // 2014-10-30 20:49:53,252
+ numItems = sscanf(currField,
+ "%4d-%2d-%2d %2d:%2d:%2d %1[,.] %6d%n",
+ &year, &month, &day, &hour, &minute,
+ &second, fractionSeparator, &fraction, &numChars);
+
+ if (numItems == 8)
+ {
+ // We were able to read a timestamp field
+
+ // Emit previous row, we have seen the start of next row
+ if (haveRowToEmit)
+ {
+ // set final two columns, message text and parse error
+ setCharOutputColumn(info,
+ MESSAGE_COLNUM,
+ messageTextField.data(),
+ rowParseStatus);
+ if (addFileColumns)
+ setCharOutputColumn(info,
+ PARSE_STATUS_COLNUM,
+ rowParseStatus.c_str(),
+ rowParseStatus);
+ emitRow(info);
+ if (doTrace)
+ {
+ printf("(%d) EVENT_LOG_READER emit\n", pid);
+ fflush(stdout);
+ }
+ }
+
+ // we read a line that will produce an output row, initialize
+ // some fields for this output row
+ haveRowToEmit = 0;
+ appendPos = 0;
+ messageTextField.erase();
+ rowParseStatus = lineParseError;
+
+ // When we see a comma between time and fraction, we interpret
+ // that as a fraction that is specified in milliseconds. Convert
+ // to microseconds. When it's specified with a dot, we interpret
+ // the fraction as microseconds (SQL syntax).
+ if (*fractionSeparator == ',')
+ fraction *= 1000;
+
+ char buf[100];
+ snprintf(buf, sizeof(buf),
+ "%04d-%02d-%02d %02d:%02d:%02d.%06d",
+ year, month, day, hour, minute, second, fraction);
+ setCharOutputColumn(info, LOG_TS_COLNUM, buf, rowParseStatus);
+ }
+ else
+ {
+ if (!haveRowToEmit)
+ {
+ // no valid timestamp and we did not have a previous line
+ // with a timestamp
+ if (numItems > 6)
+ numItems = 6;
+
+ if (doTrace)
+ {
+ printf("(%d) EVENT_LOG_READER Read only %d of 7 timestamp fields: %s\n",
+ pid, numItems, currField);
+ fflush(stdout);
+ }
+
+ // return a NULL value if we fail to parse the timestamp
+ info.out().setNull(LOG_TS_COLNUM);
+ rowParseStatus = lineParseError;
+ setParseError(FieldParserError, rowParseStatus);
+ }
+ else
+ {
+ // no valid timestamp and we have a row to emit
+ // consider this line as a continuation of previous row
+ // (add a blank instead of a line feed, though)
+ messageTextField += " ";
+ messageTextField += currField;
+
+ // add any parse errors from this line
+ for (std::string::iterator it = lineParseError.begin();
+ it != lineParseError.end();
+ it++)
+ setParseError(*it, rowParseStatus);
+ }
+ }
+
+ if (!haveRowToEmit)
+ {
+ // skip over the information already read
+ currField = currField + numChars;
+
+ // skip over the comma
+ currField = strstr(currField, ",");
+ if (currField)
+ currField++;
+ else
+ {
+ // did not find a comma delimiter, this is a parse
+ // error, produce NULL values for remaining columns
+ // except the message
+ if (numChars > 0)
+ currField = inputLineValidated + (numChars-1);
+ else
+ currField = inputLineValidated;
+ *currField = 0;
+ messageTextField =
+ (numChars < lineLengthValidated) ? currField+1 : currField;
+ setParseError(FieldParserError, rowParseStatus);
+ }
+ }
+
+ // read columns 2: SEVERITY - 9: QUERY_ID
+ for (columnNum = 2; (columnNum <= 9 && !haveRowToEmit); columnNum++)
+ {
+ // find the next comma, the end of our field value
+ char *endOfField = strstr(currField, ",");
+ char *startOfVal = NULL;
+
+ if (endOfField != NULL)
+ {
+ startOfVal = (endOfField != currField ? endOfField-1 : currField);
+
+ // next field starts after the comma
+ nextField = endOfField + 1;
+
+ // back up before the trailing comma, if the value is not empty
+ if (endOfField != currField)
+ endOfField--;
+
+ // remove trailing blanks
+ while (*endOfField == ' ')
+ endOfField--;
+
+ // place a nul-terminator at the end of the field
+ // (this overwrites the comma or a trailing blank)
+ if (endOfField != currField)
+ endOfField[1] = 0;
+ else
+ endOfField[0] = 0; // empty field
+
+ // from the end, go back to the preceding ":" or ","
+ // or until we reach the start of the current field
+ // This way, we skip field names like "CPU:" in CPU: 3
+ while (*startOfVal != ':' &&
+ *startOfVal != ',' &&
+ startOfVal != currField)
+ startOfVal--;
+
+ // skip the ":"
+ if (startOfVal != currField)
+ startOfVal++;
+
+ // skip leading blanks
+ while (*startOfVal == ' ')
+ startOfVal++;
+ } // found a comma delimiter
+ else
+ {
+ // Did not find a comma delimiter. This could be a
+ // parse error or a missing optional column,
+ // produce NULL values for remaining columns
+ // except the message
+ if (currField != inputLineValidated)
+ // back up, since currField is now pointing
+ // at the first character of the message text
+ currField--;
+ *currField = 0;
+ startOfVal = currField;
+ nextField = currField;
+ // if there is any text left, point to it
+ // (after currField, which points to a NUL byte)
+ // otherwise set the message text field to an empty string
+ if (messageTextField.empty())
+ messageTextField =
+ (currField-inputLineValidated < lineLengthValidated) ?
+ currField+1 : currField;
+ setParseError(FieldParserError, rowParseStatus);
+ }
+
+ // now that we have the non-blank portion of the value,
+ // copy it into the output column
+ switch (columnNum)
+ {
+ case 2:
+ setCharOutputColumn(info,
+ SEVERITY_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 3:
+ setCharOutputColumn(info,
+ COMPONENT_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 4:
+ setIntOutputColumn(info,
+ NODE_NUMBER_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 5:
+ setIntOutputColumn(info,
+ CPU_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 6:
+ setIntOutputColumn(info,
+ PIN_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 7:
+ setCharOutputColumn(info,
+ PROCESS_NAME_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 8:
+ setIntOutputColumn(info,
+ SQL_CODE_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ break;
+
+ case 9:
+ setCharOutputColumn(info,
+ QUERY_ID_COLNUM,
+ startOfVal,
+ rowParseStatus);
+ // we read all required fields,
+ // next field is the message text
+ if (messageTextField.empty())
+ messageTextField = nextField;
+ break;
+ }
+
+ currField = nextField;
+ } // loop over column numbers 2-9
+
// do some final adjustments
- if (!haveRowToEmit)
- {
- int numLeadingBlanks = messageTextField.find_first_not_of(' ');
-
- if (numLeadingBlanks > 0 && numLeadingBlanks != std::string::npos)
- messageTextField.erase(0, numLeadingBlanks);
-
- if (addFileColumns)
- info.out().setInt(LOG_FILE_LINE_COLNUM, lineNumber);
- }
-
- haveRowToEmit = 1;
- } // loop over the lines of the file
-
+ if (!haveRowToEmit)
+ {
+ int numLeadingBlanks = messageTextField.find_first_not_of(' ');
+
+ if (numLeadingBlanks > 0 && numLeadingBlanks != std::string::npos)
+ messageTextField.erase(0, numLeadingBlanks);
+
+ if (addFileColumns)
+ info.out().setInt(LOG_FILE_LINE_COLNUM, lineNumber);
+ }
+
+ haveRowToEmit = 1;
+ } // loop over the lines of the file
+
if (haveRowToEmit)
- {
- // set final two columns, message text and parse error
- setCharOutputColumn(info,
- MESSAGE_COLNUM,
- messageTextField.data(),
- rowParseStatus);
- if (addFileColumns)
- setCharOutputColumn(info,
- PARSE_STATUS_COLNUM,
- rowParseStatus.c_str(),
- rowParseStatus);
- // Emit a row
- emitRow(info);
- if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER emit\n", pid);
- fflush(stdout);
- }
- haveRowToEmit = 0;
+ {
+ // set final two columns, message text and parse error
+ setCharOutputColumn(info,
+ MESSAGE_COLNUM,
+ messageTextField.data(),
+ rowParseStatus);
+ if (addFileColumns)
+ setCharOutputColumn(info,
+ PARSE_STATUS_COLNUM,
+ rowParseStatus.c_str(),
+ rowParseStatus);
+ // Emit a row
+ emitRow(info);
+ if (doTrace)
+ {
+ printf("(%d) EVENT_LOG_READER emit\n", pid);
+ fflush(stdout);
+ }
+ haveRowToEmit = 0;
appendPos = 0;
- }
+ }
// Close the input file
if (infile_)
- {
- fclose(infile_);
- infile_ = NULL;
- }
+ {
+ fclose(infile_);
+ infile_ = NULL;
+ }
if (doTrace)
- {
- printf("(%d) EVENT_LOG_READER fclose\n", pid);
- fflush(stdout);
- }
-
+ {
+ printf("(%d) EVENT_LOG_READER fclose\n", pid);
+ fflush(stdout);
+ }
+
} // file name matched our pattern
- } // while (1) - list files in the directory
-
- closedir(logDir_);
- logDir_ = NULL;
+ } // while (1) - list files in the directory
+
+ closedir(logDir_);
+ logDir_ = NULL;
+ } // for numLogLocations
}
ReadCppEventsUDFInterface::~ReadCppEventsUDFInterface()
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/f8e41b95/dcs/bin/dcs-daemon.sh
----------------------------------------------------------------------
diff --git a/dcs/bin/dcs-daemon.sh b/dcs/bin/dcs-daemon.sh
index 1078a07..0061aec 100755
--- a/dcs/bin/dcs-daemon.sh
+++ b/dcs/bin/dcs-daemon.sh
@@ -174,8 +174,8 @@ case $startStop in
dcs_rotate_log $loggc
echo starting $command, logging to $logout
# Add to the command log file vital stats on our environment.
- echo "`date` Starting $command on `hostname`" >> $loglog
- echo "`ulimit -a`" >> $loglog 2>&1
+ # echo "`date` Starting $command on `hostname`" >> $loglog
+ # echo "`ulimit -a`" >> $loglog 2>&1
nohup nice -n $DCS_NICENESS "$DCS_HOME"/bin/dcs \
--config "${DCS_CONF_DIR}" \
$command "$@" $startStop > "$logout" 2>&1 < /dev/null &
@@ -188,7 +188,7 @@ case $startStop in
# kill -0 == see if the PID exists
if kill -0 `cat $pid` > /dev/null 2>&1; then
echo -n stopping $command
- echo "`date` Terminating $command" >> $loglog
+ # echo "`date` Terminating $command" >> $loglog
kill `cat $pid` > /dev/null 2>&1
while kill -0 `cat $pid` > /dev/null 2>&1; do
echo -n "."
[3/3] incubator-trafodion git commit: Merge remote branch
'origin/pr/140/head' into merge_trafodion140
Posted by sa...@apache.org.
Merge remote branch 'origin/pr/140/head' into merge_trafodion140
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/d060bef0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/d060bef0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/d060bef0
Branch: refs/heads/master
Commit: d060bef045fd4d0c074ec2db0f4ded1e01da4dd8
Parents: 2578f51 9dd48be
Author: Sandhya Sundaresan <sa...@apache.org>
Authored: Mon Oct 26 17:56:03 2015 +0000
Committer: Sandhya Sundaresan <sa...@apache.org>
Committed: Mon Oct 26 17:56:03 2015 +0000
----------------------------------------------------------------------
core/sqf/sql/scripts/genms | 3 +
core/sql/comexe/ComTdbHbaseAccess.cpp | 4 +-
core/sql/comexe/ComTdbHbaseAccess.h | 6 +
core/sql/executor/ExHbaseIUD.cpp | 6 +-
core/sql/generator/GenExplain.cpp | 8 +
core/sql/generator/GenRelUpdate.cpp | 9 +
core/sql/generator/GenUdr.cpp | 5 +-
core/sql/optimizer/NAColumn.cpp | 4 +-
core/sql/optimizer/NAColumn.h | 2 +-
core/sql/regress/hive/EXPECTED020 | 63 +-
core/sql/regress/hive/TEST020 | 27 +-
core/sql/sqlcomp/CmpSeabaseDDLtable.cpp | 2 +-
core/sql/sqlcomp/DefaultConstants.h | 1 +
core/sql/sqlcomp/nadefaults.cpp | 1 +
core/sql/sqludr/SqlUdrPredefLogReader.cpp | 983 +++++++++++++------------
dcs/bin/dcs-daemon.sh | 6 +-
16 files changed, 606 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/d060bef0/core/sql/generator/GenRelUpdate.cpp
----------------------------------------------------------------------
[2/3] incubator-trafodion git commit: Rework for issues found by Dave
and Selva. The flush size is not set in units of KB,
with a default of 1024KB. Explain will report it in rows. Size in rows,
as sent to executor is capped at 32767.
Posted by sa...@apache.org.
Rework for issues found by Dave and Selva.
The flush size is not set in units of KB, with a default of 1024KB.
Explain will report it in rows. Size in rows, as sent to executor
is capped at 32767.
Project: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/commit/9dd48bed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/tree/9dd48bed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-trafodion/diff/9dd48bed
Branch: refs/heads/master
Commit: 9dd48bed51df2a07a977eca2f13e2e68b4a599c4
Parents: f8e41b9
Author: Suresh Subbiah <su...@apache.org>
Authored: Mon Oct 26 17:09:42 2015 +0000
Committer: Suresh Subbiah <su...@apache.org>
Committed: Mon Oct 26 17:09:42 2015 +0000
----------------------------------------------------------------------
core/sql/executor/ExHbaseAccess.cpp | 4 ++--
core/sql/executor/ExHbaseAccess.h | 6 +++---
core/sql/generator/GenRelUpdate.cpp | 21 ++++++++-------------
core/sql/regress/hive/EXPECTED020 | 14 +++++++-------
core/sql/regress/hive/TEST020 | 4 ++--
core/sql/sqlcomp/DefaultConstants.h | 2 +-
core/sql/sqlcomp/nadefaults.cpp | 2 +-
7 files changed, 24 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/executor/ExHbaseAccess.cpp
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.cpp b/core/sql/executor/ExHbaseAccess.cpp
index 4bcbfdd..8e98daa 100644
--- a/core/sql/executor/ExHbaseAccess.cpp
+++ b/core/sql/executor/ExHbaseAccess.cpp
@@ -2315,7 +2315,7 @@ void ExHbaseAccessTcb::allocateDirectBufferForJNI(UInt32 rowLen)
void ExHbaseAccessTcb::allocateDirectRowBufferForJNI(
- short numCols, UInt16 maxRows)
+ short numCols, short maxRows)
{
UInt32 directBufferOverhead;
UInt32 maxRowLen;
@@ -2371,7 +2371,7 @@ short ExHbaseAccessTcb::patchDirectRowIDBuffers()
return numRowsInBuffer;
}
-void ExHbaseAccessTcb::allocateDirectRowIDBufferForJNI(UInt16 maxRows)
+void ExHbaseAccessTcb::allocateDirectRowIDBufferForJNI(short maxRows)
{
UInt32 rowIDLen;
UInt32 maxRowIDLen;
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/executor/ExHbaseAccess.h
----------------------------------------------------------------------
diff --git a/core/sql/executor/ExHbaseAccess.h b/core/sql/executor/ExHbaseAccess.h
index b7cf604..9271227 100644
--- a/core/sql/executor/ExHbaseAccess.h
+++ b/core/sql/executor/ExHbaseAccess.h
@@ -342,10 +342,10 @@ protected:
void setRowID(char *rowId, Lng32 rowIdLen);
void allocateDirectBufferForJNI(UInt32 rowLen);
void allocateDirectRowBufferForJNI(short numCols,
- UInt16 maxRows = 1);
+ short maxRows = 1);
short patchDirectRowBuffers();
short patchDirectRowIDBuffers();
- void allocateDirectRowIDBufferForJNI(UInt16 maxRows = 1);
+ void allocateDirectRowIDBufferForJNI(short maxRows = 1);
Lng32 copyColToDirectBuffer( BYTE *rowCurPtr,
char *colName, short colNameLen,
NABoolean prependNullVal, char nullVal,
@@ -464,7 +464,7 @@ protected:
//
BYTE *directRowBuffer_;
Lng32 directRowBufferLen_;
- UInt16 directBufferMaxRows_;
+ short directBufferMaxRows_;
// Structure to keep track of current row
HbaseStr row_;
// Structure to keep track of current position in direct row buffer
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/generator/GenRelUpdate.cpp
----------------------------------------------------------------------
diff --git a/core/sql/generator/GenRelUpdate.cpp b/core/sql/generator/GenRelUpdate.cpp
index 07944d3..6701094 100644
--- a/core/sql/generator/GenRelUpdate.cpp
+++ b/core/sql/generator/GenRelUpdate.cpp
@@ -2835,19 +2835,14 @@ short HbaseInsert::codeGen(Generator *generator)
hbasescan_tdb->setNoDuplicates(CmpCommon::getDefault(TRAF_LOAD_PREP_SKIP_DUPLICATES) == DF_OFF);
hbasescan_tdb->setMaxHFileSize(CmpCommon::getDefaultLong(TRAF_LOAD_MAX_HFILE_SIZE));
- ULng32 loadFlushSize = getDefault(TRAF_LOAD_FLUSH_SIZE_IN_ROWS);
- if (loadFlushSize == 0)
- {// user has not specified a size, assume 1MB buffer is optimal
-
- loadFlushSize = (1024*1024)/hbasescan_tdb->getRowLen() ;
- if (loadFlushSize > getMaxCardEst().value()) {
- // for small tables go back to previous default
- loadFlushSize = getDefault(HBASE_ROWSET_VSBB_SIZE);
- }
- }
- if (loadFlushSize > USHRT_MAX) // largest flush size, runtime cannot
- loadFlushSize = USHRT_MAX; // handle higher values without code change
- hbasescan_tdb->setTrafLoadFlushSize(loadFlushSize);
+ ULng32 loadFlushSizeinKB = getDefault(TRAF_LOAD_FLUSH_SIZE_IN_KB);
+ ULng32 loadFlushSizeinRows = 0;
+ loadFlushSizeinRows = (loadFlushSizeinKB*1024)/hbasescan_tdb->getRowLen() ;
+ // largest flush size, runtime cannot handle higher values
+ // without code change
+ if (loadFlushSizeinRows >= USHRT_MAX/2)
+ loadFlushSizeinRows = ((USHRT_MAX/2)-1);
+ hbasescan_tdb->setTrafLoadFlushSize(loadFlushSizeinRows);
// For sample file, set the sample location in HDFS and the sampling rate.
// Move later, when sampling not limited to bulk loads.
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/regress/hive/EXPECTED020
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/EXPECTED020 b/core/sql/regress/hive/EXPECTED020
index 8f07fda..636d75e 100644
--- a/core/sql/regress/hive/EXPECTED020
+++ b/core/sql/regress/hive/EXPECTED020
@@ -20,8 +20,8 @@
+> cast(LEFT_CHILD_SEQ_NUM as char(2)) lc,
+> cast(RIGHT_CHILD_SEQ_NUM as char(2)) rc,
+> substring
-+> (substring(tname from (1+locate('.',tname))),
-+> (locate('.',substring(tname from (1+locate('.',tname))))),
++> (substring(substring(tname from (1+locate('.',tname))),1,case locate(')',tname) when 0 then 0 else locate(')',substring(tname from (1+locate('.',tname))))-1 end),
++> (locate('.',substring(tname from (1+locate('.',tname)))))+1,
+> 10
+> ) tab_name
+> from table (explain(NULL,'XX'))
@@ -35,7 +35,7 @@
>>invoke hive.hive.store_orc;
-- Definition of hive table STORE_ORC
--- Definition current Wed Oct 21 00:08:18 2015
+-- Definition current Mon Oct 26 16:10:15 2015
(
S_STORE_SK INT
@@ -159,8 +159,8 @@ S OPERATOR LC RC TAB_NAME
04 ROOT 3 ?
03 HYBRID_HASH_JOIN 2 1
-02 HIVE_SCAN ? ? .STORE_ORC
-01 HIVE_SCAN ? ? .STORE_ORC
+02 HIVE_SCAN ? ? STORE_ORC
+01 HIVE_SCAN ? ? STORE_ORC
--- 4 row(s) selected.
>>
@@ -198,8 +198,8 @@ S OPERATOR LC RC TAB_NAME
04 ROOT 3 ?
03 NESTED_JOIN 1 2
-02 HIVE_SCAN ? ? .STORE_ORC
-01 HIVE_SCAN ? ? .STORE)
+02 HIVE_SCAN ? ? STORE_ORC
+01 HIVE_SCAN ? ? STORE
--- 4 row(s) selected.
>>
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/regress/hive/TEST020
----------------------------------------------------------------------
diff --git a/core/sql/regress/hive/TEST020 b/core/sql/regress/hive/TEST020
index ded1d98..50615db 100644
--- a/core/sql/regress/hive/TEST020
+++ b/core/sql/regress/hive/TEST020
@@ -42,8 +42,8 @@ prepare explainIt from
cast(LEFT_CHILD_SEQ_NUM as char(2)) lc,
cast(RIGHT_CHILD_SEQ_NUM as char(2)) rc,
substring
- (substring(tname from (1+locate('.',tname))),
- (locate('.',substring(tname from (1+locate('.',tname))))),
+ (substring(substring(tname from (1+locate('.',tname))),1,case locate(')',tname) when 0 then 0 else locate(')',substring(tname from (1+locate('.',tname))))-1 end),
+ (locate('.',substring(tname from (1+locate('.',tname)))))+1,
10
) tab_name
from table (explain(NULL,'XX'))
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/sqlcomp/DefaultConstants.h
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/DefaultConstants.h b/core/sql/sqlcomp/DefaultConstants.h
index cbe4fcc..fe28164 100644
--- a/core/sql/sqlcomp/DefaultConstants.h
+++ b/core/sql/sqlcomp/DefaultConstants.h
@@ -3766,7 +3766,7 @@ enum DefaultConstants
// costing code has broader exposure.
HBASE_DELETE_COSTING,
HBASE_UPDATE_COSTING,
- TRAF_LOAD_FLUSH_SIZE_IN_ROWS,
+ TRAF_LOAD_FLUSH_SIZE_IN_KB,
// This enum constant must be the LAST one in the list; it's a count,
// not an Attribute (it's not IN DefaultDefaults; it's the SIZE of it)!
http://git-wip-us.apache.org/repos/asf/incubator-trafodion/blob/9dd48bed/core/sql/sqlcomp/nadefaults.cpp
----------------------------------------------------------------------
diff --git a/core/sql/sqlcomp/nadefaults.cpp b/core/sql/sqlcomp/nadefaults.cpp
index e3ad370..95b26b6 100644
--- a/core/sql/sqlcomp/nadefaults.cpp
+++ b/core/sql/sqlcomp/nadefaults.cpp
@@ -3317,7 +3317,7 @@ XDDkwd__(SUBQUERY_UNNESTING, "ON"),
DD_____(TRAF_LOAD_ERROR_COUNT_ID, "" ),
DD_____(TRAF_LOAD_ERROR_COUNT_TABLE, "ERRORCOUNTER" ),
DD_____(TRAF_LOAD_ERROR_LOGGING_LOCATION, "/bulkload/logs/" ),
- DDint__(TRAF_LOAD_FLUSH_SIZE_IN_ROWS, "0"), // in # rows
+ DDint__(TRAF_LOAD_FLUSH_SIZE_IN_KB, "1024"),
DDkwd__(TRAF_LOAD_FORCE_CIF, "ON"),
DDkwd__(TRAF_LOAD_LOG_ERROR_ROWS, "OFF"),
DDint__(TRAF_LOAD_MAX_ERROR_ROWS, "0"),