You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/12/28 06:58:41 UTC
incubator-hawq git commit: HAWQ-1565. Include Pluggable Storage
Format Framework in External Table Scan
Repository: incubator-hawq
Updated Branches:
refs/heads/master 85fd30570 -> 76e38c53b
HAWQ-1565. Include Pluggable Storage Format Framework in External Table Scan
Rewrite the tuple construct and consume working flow in the external table, which leads to data copy cost.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/76e38c53
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/76e38c53
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/76e38c53
Branch: refs/heads/master
Commit: 76e38c53b9377a055e6a2db6f63dc2e984c25025
Parents: 85fd305
Author: Chiyang Wan <ch...@gmail.com>
Authored: Thu Dec 7 23:05:40 2017 +0800
Committer: Ruilong Huo <hu...@163.com>
Committed: Thu Dec 28 14:58:21 2017 +0800
----------------------------------------------------------------------
src/backend/access/external/fileam.c | 83 +++++++---
src/backend/executor/nodeExternalscan.c | 228 +++++++++++++++++++++++----
src/include/access/fileam.h | 18 ++-
3 files changed, 267 insertions(+), 62 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 6a59b95..7b77edd 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -137,11 +137,24 @@ static FILE *g_dataSource = NULL;
* ----------------
*/
FileScanDesc
-external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
- List *uriList, List *fmtOpts, char fmtType, bool isMasterOnly,
- int rejLimit, bool rejLimitInRows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding,
- List *scanquals)
+external_beginscan(ExternalScan *extScan,
+ Relation relation,
+ ResultRelSegFileInfo *segFileInfo,
+ int formatterType,
+ char *formatterName)
{
+ Index scanrelid = extScan->scan.scanrelid;
+ uint32 scancounter = extScan->scancounter;
+ List *uriList = extScan->uriList;
+ List *fmtOpts = extScan->fmtOpts;
+ char fmtType = extScan->fmtType;
+ bool isMasterOnly = extScan->isMasterOnly;
+ int rejLimit = extScan->rejLimit;
+ bool rejLimitInRows = extScan->rejLimitInRows;
+ Oid fmterrtbl = extScan->fmterrtbl;
+ int encoding = extScan->encoding;
+ List *scanquals = extScan->scan.plan.qual;
+
FileScanDesc scan;
TupleDesc tupDesc = NULL;
int attnum;
@@ -174,6 +187,9 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
scan->fs_file = NULL;
scan->fs_formatter = NULL;
+ scan->fs_formatter_type = formatterType;
+ scan->fs_formatter_name = formatterName;
+
/*
* get the external URI assigned to us.
*
@@ -229,6 +245,7 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
/* set external source (uri) */
scan->fs_uri = uri;
+ elog(LOG, "fs_uri (%d) is set as %s", segindex, uri);
/* NOTE: we delay actually opening the data source until external_getnext() */
}
else
@@ -272,14 +289,15 @@ external_beginscan(Relation relation, Index scanrelid, uint32 scancounter,
/* Initialize all the parsing and state variables */
InitParseState(scan->fs_pstate, relation, NULL, NULL, false, fmtOpts, fmtType,
- scan->fs_uri, rejLimit, rejLimitInRows, fmterrtbl, segfileinfo, encoding);
+ scan->fs_uri, rejLimit, rejLimitInRows, fmterrtbl, segFileInfo, encoding);
- if(fmttype_is_custom(fmtType))
- {
- scan->fs_formatter = (FormatterData *) palloc0 (sizeof(FormatterData));
- initStringInfo(&scan->fs_formatter->fmt_databuf);
- scan->fs_formatter->fmt_perrow_ctx = scan->fs_pstate->rowcontext;
- }
+ /*
+ * We always have custom formatter
+ */
+ scan->fs_formatter = (FormatterData *) palloc0 (sizeof(FormatterData));
+ initStringInfo(&scan->fs_formatter->fmt_databuf);
+ scan->fs_formatter->fmt_perrow_ctx = scan->fs_pstate->rowcontext;
+ scan->fs_formatter->fmt_user_ctx = NULL;
/* Set up callback to identify error line number */
scan->errcontext.callback = external_scan_error_callback;
@@ -391,6 +409,15 @@ external_endscan(FileScanDesc scan)
}
/*
+ * free formatter name
+ */
+ if (scan->fs_formatter_name)
+ {
+ pfree(scan->fs_formatter_name);
+ scan->fs_formatter_name = NULL;
+ }
+
+ /*
* free parse state memory
*/
if (scan->fs_pstate != NULL)
@@ -483,14 +510,17 @@ external_getnext_init(PlanState *state, ExternalScanState *es_state) {
* Parse a data file and return its rows in heap tuple form
* ----------------------------------------------------------------
*/
-HeapTuple
-external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
+bool
+external_getnext(FileScanDesc scan,
+ ScanDirection direction,
+ ExternalSelectDesc desc,
+ ScanState *ss,
+ TupleTableSlot *slot)
{
HeapTuple tuple;
- ScanState *ss = NULL; /* a temporary dummy for the following steps */
if (scan->fs_noop)
- return NULL;
+ return false;
/*
* open the external source (local file or http).
@@ -503,7 +533,13 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
* they are not expected (see MPP-1261). Therefore we instead do it here on the
* first time around only.
*/
- if (!scan->fs_file)
+
+ /*
+ * if the formatters do not need external protocol, the framework will not
+ * load external protocol.
+ */
+
+ if (scan->fs_file == NULL)
open_external_readable_source(scan);
/* Note: no locking manipulations needed */
@@ -516,7 +552,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
{
FILEDEBUG_2; /* external_getnext returning EOS */
- return NULL;
+ return false;
}
/*
@@ -526,7 +562,9 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
pgstat_count_heap_getnext(scan->fs_rd);
- return tuple;
+ ExecStoreGenericTuple(tuple, slot, true);
+
+ return true;
}
/*
@@ -2591,9 +2629,12 @@ static void parseFormatString(CopyState pstate, char *fmtstr, bool iscustom)
}
if (!formatter_found)
- ereport(ERROR, (errcode(ERRCODE_GP_INTERNAL_ERROR),
- errmsg("external table internal parse error: "
- "no formatter function name found")));
+ {
+ /*
+ * If there is no formatter option specified, use format name. So
+ * we don't report error here.
+ */
+ }
pstate->custom_formatter_params = l;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index 8f2ba88..f891a57 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -34,9 +34,12 @@
* ExecExternalReScan rescans the relation
*/
#include "postgres.h"
+#include "fmgr.h"
#include "access/fileam.h"
+#include "access/filesplit.h"
#include "access/heapam.h"
+#include "access/plugstorage.h"
#include "cdb/cdbvars.h"
#include "executor/execdebug.h"
#include "executor/nodeExternalscan.h"
@@ -60,13 +63,13 @@ static TupleTableSlot *ExternalNext(ExternalScanState *node);
static TupleTableSlot *
ExternalNext(ExternalScanState *node)
{
- HeapTuple tuple;
FileScanDesc scandesc;
- Index scanrelid;
- EState *estate;
+ Index scanrelid;
+ EState *estate = NULL;
ScanDirection direction;
- TupleTableSlot *slot;
- ExternalSelectDesc externalSelectDesc;
+ TupleTableSlot *slot = NULL;
+ ExternalSelectDesc externalSelectDesc = NULL;
+ bool returnTuple = false;
/*
* get information from the estate and scan state
@@ -80,8 +83,61 @@ ExternalNext(ExternalScanState *node)
/*
* get the next tuple from the file access methods
*/
- externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
- tuple = external_getnext(scandesc, direction, externalSelectDesc);
+ if (scandesc->fs_formatter_type == ExternalTableType_Invalid)
+ {
+ elog(ERROR, "invalid formatter type for external table: %s", __func__);
+ }
+ else if (scandesc->fs_formatter_type != ExternalTableType_PLUG)
+ {
+ externalSelectDesc = external_getnext_init(&(node->ss.ps), node);
+
+ returnTuple = external_getnext(scandesc, direction, externalSelectDesc,
+ &(node->ss), slot);
+ }
+ else
+ {
+ Assert(scandesc->fs_formatter_name);
+
+ FmgrInfo *getnextInitFunc = scandesc->fs_ps_scan_funcs.getnext_init;
+
+ if (getnextInitFunc)
+ {
+ /*
+ * pg_strncasecmp(scandesc->fs_formatter_name, "orc", strlen("orc"))
+ * Performance improvement for string comparison.
+ */
+ const char *formatter_name = "orc";
+ if (*(int *)(scandesc->fs_formatter_name) != *(int *)formatter_name)
+ {
+ externalSelectDesc =
+ InvokePlugStorageFormatGetNextInit(getnextInitFunc,
+ &(node->ss.ps),
+ node);
+ }
+ }
+ else
+ {
+ elog(ERROR, "%s_getnext_init function was not found",
+ scandesc->fs_formatter_name);
+ }
+
+ FmgrInfo *getnextFunc = scandesc->fs_ps_scan_funcs.getnext;
+
+ if (getnextFunc)
+ {
+ returnTuple = InvokePlugStorageFormatGetNext(getnextFunc,
+ scandesc,
+ direction,
+ externalSelectDesc,
+ &(node->ss),
+ slot);
+ }
+ else
+ {
+ elog(ERROR, "%s_getnext function was not found",
+ scandesc->fs_formatter_name);
+ }
+ }
/*
* save the tuple and the buffer returned to us by the access methods in
@@ -91,11 +147,14 @@ ExternalNext(ExternalScanState *node)
* that ExecStoreTuple will increment the refcount of the buffer; the
* refcount will not be dropped until the tuple table slot is cleared.
*/
- if (tuple)
+ if (returnTuple)
{
- Gpmon_M_Incr_Rows_Out(GpmonPktFromExtScanState(node));
- CheckSendPlanStateGpmonPkt(&node->ss.ps);
- ExecStoreGenericTuple(tuple, slot, true);
+ /*
+ * Perfmon is not supported any more.
+ *
+ * Gpmon_M_Incr_Rows_Out(GpmonPktFromExtScanState(node));
+ * CheckSendPlanStateGpmonPkt(&node->ss.ps);
+ */
/*
* CDB: Label each row with a synthetic ctid if needed for subquery dedup.
@@ -115,7 +174,10 @@ ExternalNext(ExternalScanState *node)
ExecEagerFreeExternalScan(node);
}
}
- pfree(externalSelectDesc);
+ if (externalSelectDesc)
+ {
+ pfree(externalSelectDesc);
+ }
return slot;
}
@@ -141,16 +203,16 @@ ExecExternalScan(ExternalScanState *node)
/* ----------------------------------------------------------------
-* ExecInitExternalScan
-* ----------------------------------------------------------------
-*/
+ * ExecInitExternalScan
+ * ----------------------------------------------------------------
+ */
ExternalScanState *
ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
{
ResultRelSegFileInfo *segfileinfo = NULL;
- ExternalScanState *externalstate;
- Relation currentRelation;
- FileScanDesc currentScanDesc;
+ ExternalScanState *externalstate = NULL;
+ Relation currentRelation = NULL;
+ FileScanDesc currentScanDesc = NULL;
Assert(outerPlan(node) == NULL);
Assert(innerPlan(node) == NULL);
@@ -205,19 +267,49 @@ ExecInitExternalScan(ExternalScan *node, EState *estate, int eflags)
{
segfileinfo = NULL;
}
- currentScanDesc = external_beginscan(currentRelation,
- node->scan.scanrelid,
- node->scancounter,
- node->uriList,
- node->fmtOpts,
- node->fmtType,
- node->isMasterOnly,
- node->rejLimit,
- node->rejLimitInRows,
- node->fmterrtbl,
- segfileinfo,
- node->encoding,
- node->scan.plan.qual);
+
+ externalstate->ss.splits = GetFileSplitsOfSegment(estate->es_plannedstmt->scantable_splits,
+ currentRelation->rd_id,
+ GetQEIndex());
+
+ int formatterType = ExternalTableType_Invalid;
+ char *formatterName = NULL;
+ getExternalTableTypeInList(node->fmtType, node->fmtOpts,
+ &formatterType, &formatterName);
+
+ if (formatterType == ExternalTableType_Invalid)
+ {
+ elog(ERROR, "invalid formatter type for external table: %s", __func__);
+ }
+ else if (formatterType != ExternalTableType_PLUG)
+ {
+ currentScanDesc = external_beginscan(node, currentRelation, segfileinfo,
+ formatterType, formatterName);
+ }
+ else
+ {
+ Assert(formatterName);
+
+ Oid procOid = LookupPlugStorageValidatorFunc(formatterName,
+ "beginscan");
+
+ if (OidIsValid(procOid))
+ {
+ FmgrInfo beginScanFunc;
+ fmgr_info(procOid, &beginScanFunc);
+
+ currentScanDesc = InvokePlugStorageFormatBeginScan(&beginScanFunc,
+ node,
+ &(externalstate->ss),
+ currentRelation,
+ formatterType,
+ formatterName);
+ }
+ else
+ {
+ elog(ERROR, "%s_beginscan function was not found", formatterName);
+ }
+ }
externalstate->ss.ss_currentRelation = currentRelation;
externalstate->ess_ScanDesc = currentScanDesc;
@@ -317,7 +409,28 @@ ExecStopExternalScan(ExternalScanState *node)
/*
* stop the file scan
*/
- external_stopscan(fileScanDesc);
+ if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
+ {
+ elog(ERROR, "invalid formatter type for external table: %s", __func__);
+ }
+ else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
+ {
+ external_stopscan(fileScanDesc);
+ }
+ else
+ {
+ FmgrInfo *stopScanFunc = fileScanDesc->fs_ps_scan_funcs.stopscan;
+
+ if (stopScanFunc)
+ {
+ InvokePlugStorageFormatStopScan(stopScanFunc, fileScanDesc);
+ }
+ else
+ {
+ elog(ERROR, "%s_stopscan function was not found",
+ fileScanDesc->fs_formatter_name);
+ }
+ }
}
@@ -355,7 +468,30 @@ ExecExternalReScan(ExternalScanState *node, ExprContext *exprCtxt)
ItemPointerSet(&node->cdb_fake_ctid, 0, 0);
- external_rescan(fileScan);
+ if (fileScan->fs_formatter_type == ExternalTableType_Invalid)
+ {
+ elog(ERROR, "invalid formatter type for external table: %s", __func__);
+ }
+ else if (fileScan->fs_formatter_type != ExternalTableType_PLUG)
+ {
+ external_rescan(fileScan);
+ }
+ else
+ {
+ Assert(fileScan->fs_formatter_name);
+
+ FmgrInfo *rescanFunc = fileScan->fs_ps_scan_funcs.rescan;
+
+ if (rescanFunc)
+ {
+ InvokePlugStorageFormatReScan(rescanFunc, fileScan);
+ }
+ else
+ {
+ elog(ERROR, "%s_rescan function was not found",
+ fileScan->fs_formatter_name);
+ }
+ }
}
void
@@ -379,5 +515,29 @@ void
ExecEagerFreeExternalScan(ExternalScanState *node)
{
Assert(node->ess_ScanDesc != NULL);
- external_endscan(node->ess_ScanDesc);
+
+ FileScanDesc fileScanDesc = node->ess_ScanDesc;
+
+ if (fileScanDesc->fs_formatter_type == ExternalTableType_Invalid)
+ {
+ elog(ERROR, "invalid formatter type for external table: %s", __func__);
+ }
+ else if (fileScanDesc->fs_formatter_type != ExternalTableType_PLUG)
+ {
+ external_endscan(fileScanDesc);
+ }
+ else
+ {
+ FmgrInfo *endScanFunc = fileScanDesc->fs_ps_scan_funcs.endscan;
+
+ if (endScanFunc)
+ {
+ InvokePlugStorageFormatEndScan(endScanFunc, fileScanDesc);
+ }
+ else
+ {
+ elog(ERROR, "%s_endscan function was not found",
+ fileScanDesc->fs_formatter_name);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/76e38c53/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index aee9979..6a38deb 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -82,17 +82,21 @@ typedef enum DataLineStatus
END_MARKER
} DataLineStatus;
-extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
- uint32 scancounter, List *uriList,
- List *fmtOpts, char fmtType, bool isMasterOnly,
- int rejLimit, bool rejLimitInRows,
- Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding,
- List *scanquals);
+extern FileScanDesc external_beginscan(ExternalScan *extScan,
+ Relation currentRelation,
+ ResultRelSegFileInfo *segFileInfo,
+ int formatterType,
+ char *formatterName);
extern void external_rescan(FileScanDesc scan);
extern void external_endscan(FileScanDesc scan);
extern void external_stopscan(FileScanDesc scan);
extern ExternalSelectDesc external_getnext_init(PlanState *state, ExternalScanState *es_state);
-extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
+extern bool external_getnext(FileScanDesc scan,
+ ScanDirection direction,
+ ExternalSelectDesc desc,
+ ScanState *ss,
+ TupleTableSlot *slot);
+
extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
extern void external_insert_finish(ExternalInsertDesc extInsertDesc);