You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by in...@apache.org on 2018/04/03 10:52:52 UTC
[2/2] incubator-hawq git commit: HAWQ-1602. AO table data vectorized
scan
HAWQ-1602. AO table data vectorized scan
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/c72e5894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/c72e5894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/c72e5894
Branch: refs/heads/master
Commit: c72e58946c12afeea75d883d47f1df14f587868d
Parents: cfee2c5
Author: Weinan Wang <we...@pivotal.io>
Authored: Fri Mar 30 16:45:51 2018 +0800
Committer: interma <in...@outlook.com>
Committed: Tue Apr 3 18:51:45 2018 +0800
----------------------------------------------------------------------
contrib/vexecutor/Makefile | 2 +-
contrib/vexecutor/ao_reader.c | 115 +++++++++++++++++++++
contrib/vexecutor/ao_reader.h | 18 ++++
contrib/vexecutor/execVScan.c | 9 +-
contrib/vexecutor/parquet_reader.c | 10 +-
contrib/vexecutor/vcheck.h | 18 ++--
src/backend/access/appendonly/appendonlyam.c | 9 +-
src/test/feature/vexecutor/test_vexecutor.cpp | 38 ++++++-
8 files changed, 201 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/Makefile
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/Makefile b/contrib/vexecutor/Makefile
index 0eae623..cb6cdd4 100644
--- a/contrib/vexecutor/Makefile
+++ b/contrib/vexecutor/Makefile
@@ -17,7 +17,7 @@
MODULE_big = vexecutor
-OBJS = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o parquet_reader.o
+OBJS = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o parquet_reader.o ao_reader.o
PG_CXXFLAGS = -Wall -O0 -g -std=c++11
PG_LIBS = $(libpq_pgport)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/ao_reader.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/ao_reader.c b/contrib/vexecutor/ao_reader.c
new file mode 100644
index 0000000..bdba565
--- /dev/null
+++ b/contrib/vexecutor/ao_reader.c
@@ -0,0 +1,115 @@
+#include "ao_reader.h"
+#include "tuplebatch.h"
+#include "utils/datum.h"
+
+extern MemTuple
+appendonlygettup(AppendOnlyScanDesc scan,
+ ScanDirection dir __attribute__((unused)),
+ int nkeys,
+ ScanKey key,
+ TupleTableSlot *slot);
+
+void
+BeginVScanAppendOnlyRelation(ScanState *scanState)
+{
+ BeginScanAppendOnlyRelation(scanState);
+ VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized;
+ TupleBatch tb = scanState->ss_ScanTupleSlot->PRIVATE_tb;
+ vs->ao = palloc0(sizeof(aoinfo));
+ vs->ao->proj = palloc0(sizeof(bool) * tb->ncols);
+ GetNeededColumnsForScan((Node* )scanState->ps.plan->targetlist,vs->ao->proj,tb->ncols);
+ GetNeededColumnsForScan((Node* )scanState->ps.plan->qual,vs->ao->proj,tb->ncols);
+}
+
+void
+EndVScanAppendOnlyRelation(ScanState *scanState)
+{
+ VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized;
+ pfree(vs->ao->proj);
+ pfree(vs->ao);
+ EndScanAppendOnlyRelation(scanState);
+}
+
+static TupleTableSlot *
+AOScanNext(ScanState *scanState)
+{
+ Assert(IsA(scanState, TableScanState) ||
+ IsA(scanState, DynamicTableScanState));
+ AppendOnlyScanState *node = (AppendOnlyScanState *)scanState;
+ VectorizedState* vs = scanState->ps.vectorized;
+
+ AppendOnlyScanDesc scandesc;
+ EState *estate;
+ ScanDirection direction;
+ TupleTableSlot *slot;
+
+ Assert((node->ss.scan_state & SCAN_SCAN) != 0);
+
+ estate = node->ss.ps.state;
+ scandesc = node->aos_ScanDesc;
+ direction = estate->es_direction;
+ slot = node->ss.ss_ScanTupleSlot;
+
+ MemTuple tup = appendonlygettup(scandesc, direction, scandesc->aos_nkeys, scandesc->aos_key, slot);
+
+ if (tup == NULL)
+ {
+ vs->ao->isDone = true;
+ return NULL;
+ }
+
+ pgstat_count_heap_getnext(scandesc->aos_rd);
+
+ return slot;
+}
+
+
+TupleTableSlot *
+AppendOnlyVScanNext(ScanState *scanState)
+{
+ TupleTableSlot *slot = scanState->ss_ScanTupleSlot;
+ TupleBatch tb = (TupleBatch)slot->PRIVATE_tb;
+ VectorizedState* vs = scanState->ps.vectorized;
+
+ if(vs->ao->isDone)
+ {
+ ExecClearTuple(slot);
+ return slot;
+ }
+
+ for(tb->nrows = 0;tb->nrows < tb->batchsize;tb->nrows ++)
+ {
+ slot = AOScanNext(scanState);
+
+ if(TupIsNull(slot))
+ break;
+
+ for(int i = 0;i < tb->ncols ; i ++)
+ {
+ if(vs->ao->proj[i])
+ {
+ Oid hawqTypeID = slot->tts_tupleDescriptor->attrs[i]->atttypid;
+ Oid hawqVTypeID = GetVtype(hawqTypeID);
+ if(!tb->datagroup[i])
+ tbCreateColumn(tb,i,hawqVTypeID);
+
+ Datum *ptr = GetVFunc(hawqVTypeID)->gettypeptr(tb->datagroup[i],tb->nrows);
+ *ptr = slot_getattr(slot,i + 1, &(tb->datagroup[i]->isnull[tb->nrows]));
+
+ /* if attribute is a reference, deep copy the data out to prevent ao table buffer free before vectorized scan batch done */
+ if(!slot->tts_mt_bind->tupdesc->attrs[i]->attbyval)
+ *ptr = datumCopy(*ptr,slot->tts_mt_bind->tupdesc->attrs[i]->attbyval,slot->tts_mt_bind->tupdesc->attrs[i]->attlen);
+ }
+ }
+ }
+
+ if(!slot)
+ slot = scanState->ss_ScanTupleSlot;
+
+ if (tb->nrows == 0)
+ ExecClearTuple(slot);
+ else
+ TupSetVirtualTupleNValid(slot, tb->ncols);
+ return slot;
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/ao_reader.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/ao_reader.h b/contrib/vexecutor/ao_reader.h
new file mode 100644
index 0000000..5f0f91c
--- /dev/null
+++ b/contrib/vexecutor/ao_reader.h
@@ -0,0 +1,18 @@
+#ifndef __AO_READER__
+#define __AO_READER__
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbvars.h"
+#include "executor/execdebug.h"
+#include "executor/nodeAppendOnlyscan.h"
+
+void
+BeginVScanAppendOnlyRelation(ScanState *scanState);
+TupleTableSlot *AppendOnlyVScanNext(ScanState *node);
+void
+EndVScanAppendOnlyRelation(ScanState *scanState);
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/execVScan.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVScan.c b/contrib/vexecutor/execVScan.c
index 44a6854..8779e14 100644
--- a/contrib/vexecutor/execVScan.c
+++ b/contrib/vexecutor/execVScan.c
@@ -21,6 +21,7 @@
#include "miscadmin.h"
#include "execVQual.h"
#include "parquet_reader.h"
+#include "ao_reader.h"
static TupleTableSlot*
ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd);
@@ -35,8 +36,8 @@ getVScanMethod(int tableType)
},
//APPENDONLYSCAN
{
- &AppendOnlyScanNext, &BeginScanAppendOnlyRelation, &EndScanAppendOnlyRelation,
- &ReScanAppendOnlyRelation, &MarkRestrNotAllowed, &MarkRestrNotAllowed
+ &AppendOnlyVScanNext, &BeginVScanAppendOnlyRelation, &EndVScanAppendOnlyRelation,
+ NULL,NULL,NULL
},
//PARQUETSCAN
{
@@ -130,6 +131,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
* storage allocated in the previous tuple cycle.
*/
econtext = node->ps.ps_ExprContext;
+
ResetExprContext(econtext);
/*
@@ -182,8 +184,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
* and return it.
*/
((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->nrows = ((TupleBatch)slot->PRIVATE_tb)->nrows;
- memcpy(((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip,
- ((TupleBatch)slot->PRIVATE_tb)->skip,sizeof(bool) * ((TupleBatch)slot->PRIVATE_tb)->nrows);
+ ((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip = ((TupleBatch)slot->PRIVATE_tb)->skip;
return ExecVProject(projInfo, NULL);
}
else
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/parquet_reader.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/parquet_reader.c b/contrib/vexecutor/parquet_reader.c
index 301bd65..ca58a09 100644
--- a/contrib/vexecutor/parquet_reader.c
+++ b/contrib/vexecutor/parquet_reader.c
@@ -90,7 +90,7 @@ parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *
/*
* Get next tuple batch from current row group into slot.
*
- * Return false if current row group has no tuple left, true otherwise.
+ * Return the number of tuples fetch out.
*/
static int
ParquetRowGroupReader_ScanNextTupleBatch(
@@ -105,14 +105,14 @@ ParquetRowGroupReader_ScanNextTupleBatch(
if (rowGroupReader->rowRead >= rowGroupReader->rowCount)
{
ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader);
- return false;
+ return 0;
}
/*
* get the next item (tuple) from the row group
*/
int ncol = slot->tts_tupleDescriptor->natts;
- TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
+ TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
tb->nrows = 0;
if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) {
@@ -131,12 +131,12 @@ ParquetRowGroupReader_ScanNextTupleBatch(
continue;
Oid hawqTypeID = tupDesc->attrs[i]->atttypid;
- Oid hawqVTypeID = GetVtype(hawqTypeID);
+ Oid hawqVTypeID = GetVtype(hawqTypeID);
if(!tb->datagroup[i])
tbCreateColumn(tb,i,hawqVTypeID);
vheader* header = tb->datagroup[i];
- header->dim = tb->nrows;
+ header->dim = tb->nrows;
ParquetColumnReader *nextReader =
&rowGroupReader->columnReaders[colReaderIndex];
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/vcheck.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vcheck.h b/contrib/vexecutor/vcheck.h
index 9b3d18c..bbeb9bc 100644
--- a/contrib/vexecutor/vcheck.h
+++ b/contrib/vexecutor/vcheck.h
@@ -24,21 +24,27 @@
#include "nodes/execnodes.h"
typedef struct vFuncMap
{
- Oid ntype;
- vheader* (* vtbuild)(int n);
- void (* vtfree)(vheader **vh);
+ Oid ntype;
+ vheader* (* vtbuild)(int n);
+ void (* vtfree)(vheader **vh);
Datum (* gettypeptr)(vheader *vh,int n);
void (* gettypevalue)(vheader *vh,int n,Datum *ptr);
- size_t (* vtsize)(vheader *vh);
- size_t (*serialization)(vheader* vh, unsigned char* buf);
- Datum (*deserialization)(unsigned char* buf,size_t* len);
+ size_t (* vtsize)(vheader *vh);
+ size_t (*serialization)(vheader* vh, unsigned char* buf);
+ Datum (*deserialization)(unsigned char* buf,size_t* len);
}vFuncMap;
+typedef struct aoinfo {
+ bool* proj;
+ bool isDone;
+} aoinfo;
+
/* vectorized executor state */
typedef struct VectorizedState
{
bool vectorized;
PlanState *parent;
+ aoinfo *ao;
}VectorizedState;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/src/backend/access/appendonly/appendonlyam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c
index a7e88d2..e39b516 100644
--- a/src/backend/access/appendonly/appendonlyam.c
+++ b/src/backend/access/appendonly/appendonlyam.c
@@ -107,6 +107,13 @@ typedef enum AoExecutorBlockKind
MaxAoExecutorBlockKind /* must always be last */
} AoExecutorBlockKind;
+MemTuple
+appendonlygettup(AppendOnlyScanDesc scan,
+ ScanDirection dir __attribute__((unused)),
+ int nkeys,
+ ScanKey key,
+ TupleTableSlot *slot);
+
static void
AppendOnlyExecutionReadBlock_SetSegmentFileNum(
AppendOnlyExecutorReadBlock *executorReadBlock,
@@ -1214,7 +1221,7 @@ LABEL_START_GETNEXTBLOCK:
* the scankeys.
* ----------------
*/
-static MemTuple
+MemTuple
appendonlygettup(AppendOnlyScanDesc scan,
ScanDirection dir __attribute__((unused)),
int nkeys,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/src/test/feature/vexecutor/test_vexecutor.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/vexecutor/test_vexecutor.cpp b/src/test/feature/vexecutor/test_vexecutor.cpp
index 08c4300..088784b 100644
--- a/src/test/feature/vexecutor/test_vexecutor.cpp
+++ b/src/test/feature/vexecutor/test_vexecutor.cpp
@@ -102,4 +102,40 @@ TEST_F(TestVexecutor, scanframework)
"vexecutor/ans/scan1.ans");
util.execute("drop table test1");
-}
\ No newline at end of file
+}
+
+TEST_F(TestVexecutor, scanAO)
+{
+ hawq::test::SQLUtility util;
+
+ util.execute("drop table if exists test1");
+ util.execute("create table test1 ("
+ " unique1 int4,"
+ " unique2 int4,"
+ " two int4,"
+ " four int4,"
+ " ten int4,"
+ " twenty int4,"
+ " hundred int4,"
+ " thousand int4,"
+ " twothousand int4,"
+ " fivethous int4,"
+ " tenthous int4,"
+ " odd int4,"
+ " even int4,"
+ " stringu1 name,"
+ " stringu2 name,"
+ " string4 name) WITH (appendonly = true, compresstype = SNAPPY) DISTRIBUTED RANDOMLY;");
+
+ std::string pwd = util.getTestRootPath();
+ std::string cmd = "COPY test1 FROM '" + pwd + "/vexecutor/data/tenk.data'";
+ std::cout << cmd << std::endl;
+ util.execute(cmd);
+ util.execute("select unique1 from test1");
+ util.execute("SET vectorized_executor_enable to on");
+
+ util.execSQLFile("vexecutor/sql/scan1.sql",
+ "vexecutor/ans/scan1.ans");
+
+ util.execute("drop table test1");
+}