You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by in...@apache.org on 2018/04/03 10:52:52 UTC

[2/2] incubator-hawq git commit: HAWQ-1602. AO table data vectorized scan

HAWQ-1602. AO table data vectorized scan


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/c72e5894
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/c72e5894
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/c72e5894

Branch: refs/heads/master
Commit: c72e58946c12afeea75d883d47f1df14f587868d
Parents: cfee2c5
Author: Weinan Wang <we...@pivotal.io>
Authored: Fri Mar 30 16:45:51 2018 +0800
Committer: interma <in...@outlook.com>
Committed: Tue Apr 3 18:51:45 2018 +0800

----------------------------------------------------------------------
 contrib/vexecutor/Makefile                    |   2 +-
 contrib/vexecutor/ao_reader.c                 | 115 +++++++++++++++++++++
 contrib/vexecutor/ao_reader.h                 |  18 ++++
 contrib/vexecutor/execVScan.c                 |   9 +-
 contrib/vexecutor/parquet_reader.c            |  10 +-
 contrib/vexecutor/vcheck.h                    |  18 ++--
 src/backend/access/appendonly/appendonlyam.c  |   9 +-
 src/test/feature/vexecutor/test_vexecutor.cpp |  38 ++++++-
 8 files changed, 201 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/Makefile
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/Makefile b/contrib/vexecutor/Makefile
index 0eae623..cb6cdd4 100644
--- a/contrib/vexecutor/Makefile
+++ b/contrib/vexecutor/Makefile
@@ -17,7 +17,7 @@
 
 
 MODULE_big = vexecutor
-OBJS    = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o parquet_reader.o
+OBJS    = vexecutor.o vadt.o vcheck.o tuplebatch.o execVScan.o execVQual.o parquet_reader.o ao_reader.o
 
 PG_CXXFLAGS = -Wall -O0 -g -std=c++11
 PG_LIBS = $(libpq_pgport) 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/ao_reader.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/ao_reader.c b/contrib/vexecutor/ao_reader.c
new file mode 100644
index 0000000..bdba565
--- /dev/null
+++ b/contrib/vexecutor/ao_reader.c
@@ -0,0 +1,115 @@
+#include "ao_reader.h"
+#include "tuplebatch.h"
+#include "utils/datum.h"
+
+extern  MemTuple
+appendonlygettup(AppendOnlyScanDesc scan,
+                 ScanDirection dir __attribute__((unused)),
+                 int nkeys,
+                 ScanKey key,
+                 TupleTableSlot *slot);
+
+void
+BeginVScanAppendOnlyRelation(ScanState *scanState)
+{
+    BeginScanAppendOnlyRelation(scanState);
+    VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized;
+    TupleBatch tb = scanState->ss_ScanTupleSlot->PRIVATE_tb;
+    vs->ao = palloc0(sizeof(aoinfo));
+    vs->ao->proj = palloc0(sizeof(bool) * tb->ncols);
+    GetNeededColumnsForScan((Node* )scanState->ps.plan->targetlist,vs->ao->proj,tb->ncols);
+    GetNeededColumnsForScan((Node* )scanState->ps.plan->qual,vs->ao->proj,tb->ncols);
+}
+
+void
+EndVScanAppendOnlyRelation(ScanState *scanState)
+{
+    VectorizedState* vs = (VectorizedState*)scanState->ps.vectorized;
+    pfree(vs->ao->proj);
+    pfree(vs->ao);
+    EndScanAppendOnlyRelation(scanState);
+}
+
+static TupleTableSlot *
+AOScanNext(ScanState *scanState)
+{
+	Assert(IsA(scanState, TableScanState) ||
+		   IsA(scanState, DynamicTableScanState));
+	AppendOnlyScanState *node = (AppendOnlyScanState *)scanState;
+	VectorizedState* vs = scanState->ps.vectorized;
+
+	AppendOnlyScanDesc scandesc;
+	EState	   *estate;
+	ScanDirection direction;
+	TupleTableSlot *slot;
+
+	Assert((node->ss.scan_state & SCAN_SCAN) != 0);
+
+	estate = node->ss.ps.state;
+	scandesc = node->aos_ScanDesc;
+	direction = estate->es_direction;
+	slot = node->ss.ss_ScanTupleSlot;
+
+	MemTuple tup = appendonlygettup(scandesc, direction, scandesc->aos_nkeys, scandesc->aos_key, slot);
+
+	if (tup == NULL)
+	{
+        vs->ao->isDone = true;
+        return NULL;
+	}
+
+	pgstat_count_heap_getnext(scandesc->aos_rd);
+
+    return slot;
+}
+
+
+TupleTableSlot *
+AppendOnlyVScanNext(ScanState *scanState)
+{
+    TupleTableSlot *slot = scanState->ss_ScanTupleSlot;
+    TupleBatch tb = (TupleBatch)slot->PRIVATE_tb;
+    VectorizedState* vs = scanState->ps.vectorized;
+
+    if(vs->ao->isDone)
+    {
+        ExecClearTuple(slot);
+        return slot;
+    }
+
+    for(tb->nrows = 0;tb->nrows < tb->batchsize;tb->nrows ++)
+    {
+        slot = AOScanNext(scanState);
+
+        if(TupIsNull(slot))
+            break;
+
+        for(int i = 0;i < tb->ncols ; i ++)
+        {
+           if(vs->ao->proj[i])
+            {
+                Oid hawqTypeID = slot->tts_tupleDescriptor->attrs[i]->atttypid;
+                Oid hawqVTypeID = GetVtype(hawqTypeID);
+                if(!tb->datagroup[i])
+                    tbCreateColumn(tb,i,hawqVTypeID);
+
+                Datum *ptr = GetVFunc(hawqVTypeID)->gettypeptr(tb->datagroup[i],tb->nrows);
+                *ptr = slot_getattr(slot,i + 1, &(tb->datagroup[i]->isnull[tb->nrows]));
+
+                /* if attribute is a reference, deep copy the data out to prevent ao table buffer free before vectorized scan batch done */
+                if(!slot->tts_mt_bind->tupdesc->attrs[i]->attbyval)
+                    *ptr = datumCopy(*ptr,slot->tts_mt_bind->tupdesc->attrs[i]->attbyval,slot->tts_mt_bind->tupdesc->attrs[i]->attlen);
+            }
+        }
+    }
+
+    if(!slot)
+        slot = scanState->ss_ScanTupleSlot;
+
+    if (tb->nrows == 0)
+        ExecClearTuple(slot);
+    else
+        TupSetVirtualTupleNValid(slot, tb->ncols);
+    return slot;
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/ao_reader.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/ao_reader.h b/contrib/vexecutor/ao_reader.h
new file mode 100644
index 0000000..5f0f91c
--- /dev/null
+++ b/contrib/vexecutor/ao_reader.h
@@ -0,0 +1,18 @@
+#ifndef __AO_READER__
+#define __AO_READER__
+
+#include "postgres.h"
+
+#include "access/heapam.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbvars.h"
+#include "executor/execdebug.h"
+#include "executor/nodeAppendOnlyscan.h"
+
+void
+BeginVScanAppendOnlyRelation(ScanState *scanState);
+TupleTableSlot *AppendOnlyVScanNext(ScanState *node);
+void
+EndVScanAppendOnlyRelation(ScanState *scanState);
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/execVScan.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/execVScan.c b/contrib/vexecutor/execVScan.c
index 44a6854..8779e14 100644
--- a/contrib/vexecutor/execVScan.c
+++ b/contrib/vexecutor/execVScan.c
@@ -21,6 +21,7 @@
 #include "miscadmin.h"
 #include "execVQual.h"
 #include "parquet_reader.h"
+#include "ao_reader.h"
 
 static TupleTableSlot*
 ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd);
@@ -35,8 +36,8 @@ getVScanMethod(int tableType)
                     },
                     //APPENDONLYSCAN
                     {
-                            &AppendOnlyScanNext, &BeginScanAppendOnlyRelation, &EndScanAppendOnlyRelation,
-                            &ReScanAppendOnlyRelation, &MarkRestrNotAllowed, &MarkRestrNotAllowed
+                            &AppendOnlyVScanNext, &BeginVScanAppendOnlyRelation, &EndVScanAppendOnlyRelation,
+                                                                                 NULL,NULL,NULL
                     },
                     //PARQUETSCAN
                     {
@@ -130,6 +131,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
      * storage allocated in the previous tuple cycle.
      */
     econtext = node->ps.ps_ExprContext;
+
     ResetExprContext(econtext);
 
     /*
@@ -182,8 +184,7 @@ ExecVScan(ScanState *node, ExecScanAccessMtd accessMtd)
                  * and return it.
                  */
                 ((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->nrows = ((TupleBatch)slot->PRIVATE_tb)->nrows;
-                memcpy(((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip,
-                       ((TupleBatch)slot->PRIVATE_tb)->skip,sizeof(bool) * ((TupleBatch)slot->PRIVATE_tb)->nrows);
+                ((TupleBatch)projInfo->pi_slot->PRIVATE_tb)->skip = ((TupleBatch)slot->PRIVATE_tb)->skip;
                 return ExecVProject(projInfo, NULL);
             }
             else

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/parquet_reader.c
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/parquet_reader.c b/contrib/vexecutor/parquet_reader.c
index 301bd65..ca58a09 100644
--- a/contrib/vexecutor/parquet_reader.c
+++ b/contrib/vexecutor/parquet_reader.c
@@ -90,7 +90,7 @@ parquet_vgetnext(ParquetScanDesc scan, ScanDirection direction, TupleTableSlot *
 /*
  * Get next tuple batch from current row group into slot.
  *
- * Return false if current row group has no tuple left, true otherwise.
+ * Return the number of tuples fetch out.
  */
 static int
 ParquetRowGroupReader_ScanNextTupleBatch(
@@ -105,14 +105,14 @@ ParquetRowGroupReader_ScanNextTupleBatch(
 	if (rowGroupReader->rowRead >= rowGroupReader->rowCount)
 	{
 		ParquetRowGroupReader_FinishedScanRowGroup(rowGroupReader);
-		return false;
+		return 0;
 	}
 
 	/*
 	 * get the next item (tuple) from the row group
 	 */
 	int ncol = slot->tts_tupleDescriptor->natts;
-    TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
+	TupleBatch tb = (TupleBatch )slot->PRIVATE_tb;
 
 	tb->nrows = 0;
 	if (rowGroupReader->rowRead + tb->batchsize > rowGroupReader->rowCount) {
@@ -131,12 +131,12 @@ ParquetRowGroupReader_ScanNextTupleBatch(
 			continue;
 
 		Oid hawqTypeID = tupDesc->attrs[i]->atttypid;
-        Oid hawqVTypeID = GetVtype(hawqTypeID);
+		Oid hawqVTypeID = GetVtype(hawqTypeID);
 		if(!tb->datagroup[i])
 			tbCreateColumn(tb,i,hawqVTypeID);
 
 		vheader* header = tb->datagroup[i];
-        header->dim = tb->nrows;
+		header->dim = tb->nrows;
 
 		ParquetColumnReader *nextReader =
 			&rowGroupReader->columnReaders[colReaderIndex];

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/contrib/vexecutor/vcheck.h
----------------------------------------------------------------------
diff --git a/contrib/vexecutor/vcheck.h b/contrib/vexecutor/vcheck.h
index 9b3d18c..bbeb9bc 100644
--- a/contrib/vexecutor/vcheck.h
+++ b/contrib/vexecutor/vcheck.h
@@ -24,21 +24,27 @@
 #include "nodes/execnodes.h"
 typedef struct vFuncMap
 {
-    Oid ntype;
-    vheader* (* vtbuild)(int n);
-    void (* vtfree)(vheader **vh);
+	Oid ntype;
+	vheader* (* vtbuild)(int n);
+	void (* vtfree)(vheader **vh);
 	Datum (* gettypeptr)(vheader *vh,int n);
 	void (* gettypevalue)(vheader *vh,int n,Datum *ptr);
-    size_t (* vtsize)(vheader *vh);
-    size_t (*serialization)(vheader* vh, unsigned char* buf);
-    Datum (*deserialization)(unsigned char* buf,size_t* len);
+	size_t (* vtsize)(vheader *vh);
+	size_t (*serialization)(vheader* vh, unsigned char* buf);
+	Datum (*deserialization)(unsigned char* buf,size_t* len);
 }vFuncMap;
 
+typedef struct aoinfo {
+	bool* proj;
+	bool isDone;
+} aoinfo;
+
 /* vectorized executor state */
 typedef struct VectorizedState
 {
 	bool vectorized;
 	PlanState *parent;
+	aoinfo *ao;
 }VectorizedState;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/src/backend/access/appendonly/appendonlyam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c
index a7e88d2..e39b516 100644
--- a/src/backend/access/appendonly/appendonlyam.c
+++ b/src/backend/access/appendonly/appendonlyam.c
@@ -107,6 +107,13 @@ typedef enum AoExecutorBlockKind
 	MaxAoExecutorBlockKind /* must always be last */
 } AoExecutorBlockKind;
 
+MemTuple
+appendonlygettup(AppendOnlyScanDesc scan,
+				 ScanDirection dir __attribute__((unused)),
+				 int nkeys,
+				 ScanKey key,
+				 TupleTableSlot *slot);
+
 static void
 AppendOnlyExecutionReadBlock_SetSegmentFileNum(
 	AppendOnlyExecutorReadBlock		*executorReadBlock,
@@ -1214,7 +1221,7 @@ LABEL_START_GETNEXTBLOCK:
  * the scankeys.
  * ----------------
  */
-static MemTuple
+MemTuple
 appendonlygettup(AppendOnlyScanDesc scan,
 				 ScanDirection dir __attribute__((unused)),
 				 int nkeys,

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/c72e5894/src/test/feature/vexecutor/test_vexecutor.cpp
----------------------------------------------------------------------
diff --git a/src/test/feature/vexecutor/test_vexecutor.cpp b/src/test/feature/vexecutor/test_vexecutor.cpp
index 08c4300..088784b 100644
--- a/src/test/feature/vexecutor/test_vexecutor.cpp
+++ b/src/test/feature/vexecutor/test_vexecutor.cpp
@@ -102,4 +102,40 @@ TEST_F(TestVexecutor, scanframework)
 					"vexecutor/ans/scan1.ans");
 
   util.execute("drop table test1");
-}
\ No newline at end of file
+}
+
+TEST_F(TestVexecutor, scanAO)
+{
+	hawq::test::SQLUtility util;
+
+	util.execute("drop table if exists test1");
+	util.execute("create table test1 ("
+				 "	unique1		int4,"
+				 "	unique2		int4,"
+				 "	two			int4,"
+				 "	four		int4,"
+				 "	ten			int4,"
+				 "	twenty		int4,"
+				 "	hundred		int4,"
+				 "	thousand	int4,"
+				 "	twothousand	int4,"
+				 "	fivethous	int4,"
+				 "	tenthous	int4,"
+				 "	odd			int4,"
+				 "	even		int4,"
+				 "	stringu1	name,"
+				 "	stringu2	name,"
+				 "	string4		name) WITH (appendonly = true, compresstype = SNAPPY) DISTRIBUTED RANDOMLY;");
+
+  std::string pwd = util.getTestRootPath();
+  std::string cmd = "COPY test1 FROM '" + pwd + "/vexecutor/data/tenk.data'";
+  std::cout << cmd << std::endl;
+  util.execute(cmd);
+  util.execute("select unique1 from test1");
+  util.execute("SET vectorized_executor_enable to on");
+
+  util.execSQLFile("vexecutor/sql/scan1.sql",
+					"vexecutor/ans/scan1.ans");
+
+  util.execute("drop table test1");
+}