You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/12/28 06:58:41 UTC

incubator-hawq git commit: HAWQ-1565. Include Pluggable Storage Format Framework in External Table Scan

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 85fd30570 -> 76e38c53b


HAWQ-1565. Include Pluggable Storage Format Framework in External Table Scan

Rewrite the tuple construct and consume working flow in the external table, which leads to data copy cost.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/76e38c53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/76e38c53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/76e38c53

Branch: refs/heads/master
Commit: 76e38c53b9377a055e6a2db6f63dc2e984c25025
Parents: 85fd305
Author: Chiyang Wan <ch...@gmail.com>
Authored: Thu Dec 7 23:05:40 2017 +0800
Committer: Ruilong Huo <hu...@163.com>
Committed: Thu Dec 28 14:58:21 2017 +0800

----------------------------------------------------------------------
 src/backend/access/external/fileam.c    |  83 +++++++---
 src/backend/executor/nodeExternalscan.c | 228 +++++++++++++++++++++++----
 src/include/access/fileam.h             |  18 ++-
 3 files changed, 267 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 6a59b95..7b77edd 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -137,11 +137,24 @@ static FILE *g_dataSource = NULL;
 * ----------------
 */
 FileScanDesc
-external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
-			   List *uriList, List *fmtOpts, char fmtType, bool isMasterOnly,
-			   int rejLimit, bool rejLimitInRows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding,
-			   List *scanquals)
+external_beginscan(ExternalScan *extScan,
+                   Relation relation,
+                   ResultRelSegFileInfo *segFileInfo,
+                   int formatterType,
+                   char *formatterName)
 {
+	Index scanrelid = extScan->scan.scanrelid;
+	uint32 scancounter = extScan->scancounter;
+	List *uriList = extScan->uriList;
+	List *fmtOpts = extScan->fmtOpts;
+	char fmtType = extScan->fmtType;
+	bool isMasterOnly = extScan->isMasterOnly;
+	int rejLimit = extScan->rejLimit;
+	bool rejLimitInRows = extScan->rejLimitInRows;
+	Oid fmterrtbl = extScan->fmterrtbl;
+	int encoding = extScan->encoding;
+	List *scanquals = extScan->scan.plan.qual;
+
 	FileScanDesc scan;
 	TupleDesc	tupDesc = NULL;
 	int			attnum;
@@ -174,6 +187,9 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
 	scan->fs_file = NULL;
 	scan->fs_formatter = NULL;
 
+	scan->fs_formatter_type = formatterType;
+	scan->fs_formatter_name = formatterName;
+
 	/*
 	 * get the external URI assigned to us.
 	 *
@@ -229,6 +245,7 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
 		/* set external source (uri) */
 		scan->fs_uri = uri;
 
+		elog(LOG, "fs_uri (%d) is set as %s", segindex, uri);
 		/* NOTE: we delay actually opening the data source until external_getnext() */
 	}
 	else
@@ -272,14 +289,15 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
 
 	/* Initialize all the parsing and state variables */
 	InitParseState(scan->fs_pstate, relation, NULL, NULL, false, fmtOpts, fmtType,
-				   scan->fs_uri, rejLimit, rejLimitInRows, fmterrtbl, segfileinfo, encoding);
+	               scan->fs_uri, rejLimit, rejLimitInRows, fmterrtbl, segFileInfo, encoding);
 
-	if(fmttype_is_custom(fmtType))
-	{
-		scan->fs_formatter = (FormatterData *) palloc0 (sizeof(FormatterData));
-		initStringInfo(&scan->fs_formatter->fmt_databuf);
-		scan->fs_formatter->fmt_perrow_ctx = scan->fs_pstate->rowcontext;
-	}
+	/*
+	 * We always have custom formatter
+	 */
+	scan->fs_formatter = (FormatterData *) palloc0 (sizeof(FormatterData));
+	initStringInfo(&scan->fs_formatter->fmt_databuf);
+	scan->fs_formatter->fmt_perrow_ctx = scan->fs_pstate->rowcontext;
+	scan->fs_formatter->fmt_user_ctx = NULL;
 
 	/* Set up callback to identify error line number */
 	scan->errcontext.callback = external_scan_error_callback;
@@ -391,6 +409,15 @@ external_endscan(FileScanDesc scan)
 	}
 
 	/*
+	 * free formatter name
+	 */
+	if (scan->fs_formatter_name)
+	{
+		pfree(scan->fs_formatter_name);
+		scan->fs_formatter_name = NULL;
+	}
+
+	/*
 	 * free parse state memory
 	 */
 	if (scan->fs_pstate != NULL)
@@ -483,14 +510,17 @@ external_getnext_init(PlanState *state, ExternalScanState *es_state) {
 *		Parse a data file and return its rows in heap tuple form
 * ----------------------------------------------------------------
 */
-HeapTuple
-external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
+bool
+external_getnext(FileScanDesc scan,
+                 ScanDirection direction,
+                 ExternalSelectDesc desc,
+                 ScanState *ss,
+                 TupleTableSlot *slot)
 {
 	HeapTuple	tuple;
-	ScanState *ss = NULL; /* a temporary dummy for the following steps */
 
 	if (scan->fs_noop)
-		return NULL;
+		return false;
 
 	/*
 	 * open the external source (local file or http).
@@ -503,7 +533,13 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
 	 * they are not expected (see MPP-1261). Therefore we instead do it here on the
 	 * first time around only.
 	 */
-	if (!scan->fs_file)
+
+	/*
+	 * if the formatters do not need external protocol, the framework will not
+	 * load external protocol.
+	 */
+
+	if (scan->fs_file == NULL)
 		open_external_readable_source(scan);
 
 	/* Note: no locking manipulations needed */
@@ -516,7 +552,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
 	{
 		FILEDEBUG_2;			/* external_getnext returning EOS */
 
-		return NULL;
+		return false;
 	}
 
 	/*
@@ -526,7 +562,9 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
 
 	pgstat_count_heap_getnext(scan->fs_rd);
 
-	return tuple;
+	ExecStoreGenericTuple(tuple, slot, true);
+
+	return true;
 }
 
 /*
@@ -2591,9 +2629,12 @@ static void parseFormatString(CopyState pstate, char *fmtstr, bool iscustom)
 		}
 
 		if (!formatter_found)
-			ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
-							errmsg("external table internal parse error: "
-									"no formatter function name found")));
+		{
+			/*
+			 * If there is no formatter option specified, use format name. So
+			 * we don't report error here.
+			 */
+		}
 
 		pstate->custom_formatter_params = l;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index 8f2ba88..f891a57 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -34,9 +34,12 @@
  *		ExecExternalReScan				rescans the relation
  */
 #include "postgres.h"
+#include "fmgr.h"
 
 #include "access/fileam.h"
+#include "access/filesplit.h"
 #include "access/heapam.h"
+#include "access/plugstorage.h"
 #include "cdb/cdbvars.h"
 #include "executor/execdebug.h"
 #include "executor/nodeExternalscan.h"
@@ -60,13 +63,13 @@ static TupleTableSlot *ExternalNext(ExternalScanState *node);
 static TupleTableSlot *
 ExternalNext(ExternalScanState *node)
 {
-	HeapTuple	tuple;
 	FileScanDesc scandesc;
-	Index		scanrelid;
-	EState	   *estate;
+	Index scanrelid;
+	EState *estate = NULL;
 	ScanDirection direction;
-	TupleTableSlot *slot;
-	ExternalSelectDesc externalSelectDesc;
+	TupleTableSlot *slot = NULL;
+	ExternalSelectDesc externalSelectDesc = NULL;
+	bool returnTuple = false;
 
 	/*
 	 * get information from the estate and scan state
@@ -80,8 +83,61 @@ ExternalNext(ExternalScanState *node)
 	/*
 	 * get the next tuple from the file access methods
 	 */
-	externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
-	tuple = external_getnext(scandesc, direction, externalSelectDesc);
+	if (scandesc->fs_formatter_type == ExternalTableType_Invalid)
+	{
+		elog(ERROR, "invalid formatter type for external table: %s", __func__);
+	}
+	else if (scandesc->fs_formatter_type != ExternalTableType_PLUG)
+	{
+		externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
+
+		returnTuple = external_getnext(scandesc, direction, externalSelectDesc,
+		                               &(node->ss), slot);
+	}
+	else
+	{
+		Assert(scandesc->fs_formatter_name);
+
+		FmgrInfo *getnextInitFunc = scandesc->fs_ps_scan_funcs.getnext_init;
+
+		if (getnextInitFunc)
+		{
+			/*
+			 * pg_strncasecmp(scandesc->fs_formatter_name, "orc", strlen("orc"))
+			 * Performance improvement for string comparison.
+			 */
+			const char *formatter_name = "orc";
+			if (*(int *)(scandesc->fs_formatter_name) != *(int *)formatter_name)
+			{
+				externalSelectDesc =
+					InvokePlugStorageFormatGetNextInit(getnextInitFunc,
+					                                   &(node->ss.ps),
+					                                   node);
+			}
+		}
+		else
+		{
+			elog(ERROR, "%s_getnext_init function was not found",
+			            scandesc->fs_formatter_name);
+		}
+
+		FmgrInfo *getnextFunc = scandesc->fs_ps_scan_funcs.getnext;
+
+		if (getnextFunc)
+		{
+			returnTuple = InvokePlugStorageFormatGetNext(getnextFunc,
+			                                             scandesc,
+			                                             direction,
+			                                             externalSelectDesc,
+			                                             &(node->ss),
+			                                             slot);
+		}
+		else
+		{
+			elog(ERROR, "%s_getnext function was not found",
+			            scandesc->fs_formatter_name);
+		}
+	}
 
 	/*
 	 * save the tuple and the buffer returned to us by the access methods in
@@ -91,11 +147,14 @@ ExternalNext(ExternalScanState *node)
 	 * that ExecStoreTuple will increment the refcount of the buffer; the
 	 * refcount will not be dropped until the tuple table slot is cleared.
 	 */
-	if (tuple)
+	if (returnTuple)
 	{
-		Gpmon_M_Incr_Rows_Out(GpmonPktFromExtScanState(node));
-		CheckSendPlanStateGpmonPkt(&node->ss.ps);
-		ExecStoreGenericTuple(tuple, slot, true);
+		/*
+		 * Perfmon is not supported any more.
+		 *
+		 * Gpmon_M_Incr_Rows_Out(GpmonPktFromExtScanState(node));
+		 * CheckSendPlanStateGpmonPkt(&node->ss.ps);
+		 */
 
 	    /*
 	     * CDB: Label each row with a synthetic ctid if needed for subquery dedup.
@@ -115,7 +174,10 @@ ExternalNext(ExternalScanState *node)
 			ExecEagerFreeExternalScan(node);
 		}
 	}
-	pfree(externalSelectDesc);
+	if (externalSelectDesc)
+	{
+		pfree(externalSelectDesc);
+	}
 
 	return slot;
 }
@@ -141,16 +203,16 @@ ExecExternalScan(ExternalScanState *node)
 
 
 /* ----------------------------------------------------------------
-*		ExecInitExternalScan
-* ----------------------------------------------------------------
-*/
+ *		ExecInitExternalScan
+ * ----------------------------------------------------------------
+ */
 ExternalScanState *
 ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
 {
 	ResultRelSegFileInfo *segfileinfo = NULL;
-	ExternalScanState *externalstate;
-	Relation	currentRelation;
-	FileScanDesc currentScanDesc;
+	ExternalScanState *externalstate = NULL;
+	Relation currentRelation = NULL;
+	FileScanDesc currentScanDesc = NULL;
 
 	Assert(outerPlan(node) == NULL);
 	Assert(innerPlan(node) == NULL);
@@ -205,19 +267,49 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
 	{
 		segfileinfo = NULL;
 	}
-	currentScanDesc = external_beginscan(currentRelation,
-									 node->scan.scanrelid,
-									 node->scancounter,
-									 node->uriList,
-									 node->fmtOpts,
-									 node->fmtType,
-									 node->isMasterOnly,
-									 node->rejLimit,
-									 node->rejLimitInRows,
-									 node->fmterrtbl,
-									 segfileinfo,
-									 node->encoding,
-									 node->scan.plan.qual);
+
+	externalstate->ss.splits = GetFileSplitsOfSegment(estate->es_plannedstmt->scantable_splits,
+	                                                  currentRelation->rd_id,
+	                                                  GetQEIndex());
+
+	int   formatterType = ExternalTableType_Invalid;
+	char *formatterName = NULL;
+	getExternalTableTypeInList(node->fmtType, node->fmtOpts,
+	                         &formatterType, &formatterName);
+
+	if (formatterType == ExternalTableType_Invalid)
+	{
+		elog(ERROR, "invalid formatter type for external table: %s", __func__);
+	}
+	else if (formatterType != ExternalTableType_PLUG)
+	{
+		currentScanDesc = external_beginscan(node, currentRelation, segfileinfo,
+		                                     formatterType, formatterName);
+	}
+	else
+	{
+		Assert(formatterName);
+
+		Oid	procOid = LookupPlugStorageValidatorFunc(formatterName,
+		                                             "beginscan");
+
+		if (OidIsValid(procOid))
+		{
+			FmgrInfo beginScanFunc;
+			fmgr_info(procOid, &beginScanFunc);
+
+			currentScanDesc = InvokePlugStorageFormatBeginScan(&beginScanFunc,
+			                                                   node,
+			                                                   &(externalstate->ss),
+			                                                   currentRelation,
+			                                                   formatterType,
+			                                                   formatterName);
+		}
+		else
+		{
+			elog(ERROR, "%s_beginscan function was not found", formatterName);
+		}
+	}
 
 	externalstate->ss.ss_currentRelation = currentRelation;
 	externalstate->ess_ScanDesc = currentScanDesc;
@@ -317,7 +409,28 @@ ExecStopExternalScan(ExternalScanState *node)
 	/*
 	 * stop the file scan
 	 */
-	external_stopscan(fileScanDesc);
+	if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
+	{
+		elog(ERROR, "invalid formatter type for external table: %s", __func__);
+	}
+	else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
+	{
+		external_stopscan(fileScanDesc);
+	}
+	else
+	{
+		FmgrInfo *stopScanFunc = fileScanDesc->fs_ps_scan_funcs.stopscan;
+
+		if (stopScanFunc)
+		{
+			InvokePlugStorageFormatStopScan(stopScanFunc, fileScanDesc);
+		}
+		else
+		{
+			elog(ERROR, "%s_stopscan function was not found",
+			            fileScanDesc->fs_formatter_name);
+		}
+	}
 }
 
 
@@ -355,7 +468,30 @@ ExecExternalReScan(ExternalScanState *node, ExprContext *exprCtxt)
 
 	ItemPointerSet(&node->cdb_fake_ctid, 0, 0);
 
-	external_rescan(fileScan);
+	if (fileScan->fs_formatter_type == ExternalTableType_Invalid)
+	{
+		elog(ERROR, "invalid formatter type for external table: %s", __func__);
+	}
+	else if (fileScan->fs_formatter_type != ExternalTableType_PLUG)
+	{
+		external_rescan(fileScan);
+	}
+	else
+	{
+		Assert(fileScan->fs_formatter_name);
+
+		FmgrInfo *rescanFunc = fileScan->fs_ps_scan_funcs.rescan;
+
+		if (rescanFunc)
+		{
+			InvokePlugStorageFormatReScan(rescanFunc, fileScan);
+		}
+		else
+		{
+			elog(ERROR, "%s_rescan function was not found",
+			            fileScan->fs_formatter_name);
+		}
+	}
 }
 
 void
@@ -379,5 +515,29 @@ void
 ExecEagerFreeExternalScan(ExternalScanState *node)
 {
 	Assert(node->ess_ScanDesc != NULL);
-	external_endscan(node->ess_ScanDesc);
+
+	FileScanDesc fileScanDesc = node->ess_ScanDesc;
+
+	if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
+	{
+		elog(ERROR, "invalid formatter type for external table: %s", __func__);
+	}
+	else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
+	{
+		external_endscan(fileScanDesc);
+	}
+	else
+	{
+		FmgrInfo *endScanFunc = fileScanDesc->fs_ps_scan_funcs.endscan;
+
+		if (endScanFunc)
+		{
+			InvokePlugStorageFormatEndScan(endScanFunc, fileScanDesc);
+		}
+		else
+		{
+			elog(ERROR, "%s_endscan function was not found",
+			            fileScanDesc->fs_formatter_name);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index aee9979..6a38deb 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -82,17 +82,21 @@ typedef enum DataLineStatus
 	END_MARKER
 } DataLineStatus;
 
-extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
-								   uint32 scancounter, List *uriList,
-								   List *fmtOpts, char fmtType, bool isMasterOnly,
-								   int rejLimit, bool rejLimitInRows,
-								   Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding,
-								   List *scanquals);
+extern FileScanDesc external_beginscan(ExternalScan *extScan,
+                                       Relation currentRelation,
+                                       ResultRelSegFileInfo *segFileInfo,
+                                       int formatterType,
+                                       char *formatterName);
 extern void external_rescan(FileScanDesc scan);
 extern void external_endscan(FileScanDesc scan);
 extern void external_stopscan(FileScanDesc scan);
 extern ExternalSelectDesc external_getnext_init(PlanState *state, ExternalScanState *es_state);
-extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
+extern bool external_getnext(FileScanDesc scan,
+                             ScanDirection direction,
+                             ExternalSelectDesc desc,
+                             ScanState *ss,
+                             TupleTableSlot *slot);
+
 extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
 extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
 extern void external_insert_finish(ExternalInsertDesc extInsertDesc);