You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2018/08/14 02:15:39 UTC
[4/4] incubator-hawq git commit: Add hdfs protocol for pluggable
storage framework
Add hdfs protocol for pluggable storage framework
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/1c189fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/1c189fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/1c189fc1
Branch: refs/heads/master
Commit: 1c189fc12bf357bed36a4ef85d973a49eb26cd28
Parents: 9f33d8d
Author: oushu1wangziming1 <wa...@oushu.io>
Authored: Tue Jul 17 15:43:36 2018 +0800
Committer: Ruilong Huo <hu...@163.com>
Committed: Tue Aug 14 10:14:46 2018 +0800
----------------------------------------------------------------------
contrib/Makefile | 1 +
contrib/exthdfs/Makefile | 34 +
contrib/exthdfs/common.h | 38 +
contrib/exthdfs/exthdfs.c | 469 ++++++++
contrib/extprotocol/gpextprotocol.c | 2 +-
src/backend/access/external/fileam.c | 740 ++++++++----
src/backend/access/external/plugstorage.c | 30 +-
src/backend/catalog/cdb_external_extensions.sql | 12 +
src/backend/catalog/heap.c | 79 +-
src/backend/cdb/cdbdatalocality.c | 434 ++++++-
src/backend/cdb/cdbpartition.c | 24 +-
src/backend/commands/analyze.c | 417 ++++++-
src/backend/commands/copy.c | 22 +-
src/backend/commands/indexcmds.c | 1 +
src/backend/commands/sequence.c | 22 +-
src/backend/commands/tablecmds.c | 870 +++++++++++---
src/backend/commands/typecmds.c | 19 +-
src/backend/commands/user.c | 35 +-
src/backend/commands/view.c | 19 +-
src/backend/executor/execDML.c | 20 +-
src/backend/nodes/copyfuncs.c | 40 +-
src/backend/nodes/equalfuncs.c | 39 +-
src/backend/nodes/outfast.c | 40 +-
src/backend/nodes/outfuncs.c | 43 +-
src/backend/nodes/readfast.c | 60 +-
src/backend/nodes/readfuncs.c | 44 +-
src/backend/optimizer/plan/createplan.c | 44 +-
src/backend/optimizer/plan/planner.c | 10 +-
src/backend/parser/analyze.c | 1098 +++++++++++++-----
src/backend/parser/gram.y | 178 ++-
src/backend/tcop/utility.c | 48 +-
src/backend/utils/misc/uriparser.c | 10 +-
src/include/access/fileam.h | 13 +-
src/include/access/formatter.h | 10 +
src/include/access/plugstorage.h | 6 +-
src/include/catalog/pg_exttable.h | 7 +-
src/include/cdb/cdbdatalocality.h | 11 +-
src/include/commands/tablecmds.h | 4 +-
src/include/nodes/parsenodes.h | 118 +-
src/include/parser/analyze.h | 50 +-
src/include/utils/uri.h | 5 +-
41 files changed, 4016 insertions(+), 1150 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/Makefile
----------------------------------------------------------------------
diff --git a/contrib/Makefile b/contrib/Makefile
index 695e92a..e5daff9 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -9,6 +9,7 @@ WANTED_DIRS = \
extprotocol \
gp_cancel_query \
formatter_fixedwidth \
+ exthdfs\
hawq-hadoop
ifeq ($(with_pgcrypto), yes)
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/Makefile
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/Makefile b/contrib/exthdfs/Makefile
new file mode 100644
index 0000000..e247664
--- /dev/null
+++ b/contrib/exthdfs/Makefile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+MODULE_big = exthdfs
+OBJS = exthdfs.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+override CFLAGS += -lhdfs3
+
+ifdef USE_PGXS
+PGXS := $(shell pg_config --pgxs)
+include $(PGXS)
+else
+subdir = contrib/exthdfs
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/common.h
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/common.h b/contrib/exthdfs/common.h
new file mode 100644
index 0000000..4111909
--- /dev/null
+++ b/contrib/exthdfs/common.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _EXTHDFS_COMMON_H_
+#define _EXTHDFS_COMMON_H_
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/extprotocol.h"
+#include "access/fileam.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_exttable.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "miscadmin.h"
+
+#include <fcntl.h>
+
+#endif // _EXTHDFS_COMMON_H_
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/exthdfs.c
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/exthdfs.c b/contrib/exthdfs/exthdfs.c
new file mode 100644
index 0000000..1378734
--- /dev/null
+++ b/contrib/exthdfs/exthdfs.c
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "postgres.h"
+
+#include "common.h"
+#include "access/extprotocol.h"
+#include "cdb/cdbdatalocality.h"
+#include "storage/fd.h"
+#include "storage/filesystem.h"
+#include "utils/uri.h"
+
+
+
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation);
+PG_FUNCTION_INFO_V1(hdfsprotocol_validate);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS);
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
+{
+
+ // Build the result instance
+ int nsize = 0;
+ int numOfBlock = 0;
+ ExtProtocolBlockLocationData *bldata =
+ palloc0(sizeof(ExtProtocolBlockLocationData));
+ if (bldata == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "cannot allocate due to no memory");
+ }
+ bldata->type = T_ExtProtocolBlockLocationData;
+ fcinfo->resultinfo = bldata;
+
+ ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *)
+ (fcinfo->context);
+
+
+ // Parse URI of the first location, we expect all locations uses the same
+ // name node server. This is checked in validation function.
+
+ char *first_uri_str = (char *)strVal(lfirst(list_head(pvalidator_data->url_list)));
+ Uri *uri = ParseExternalTableUri(first_uri_str);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "extracted HDFS name node address %s:%d",
+ uri->hostname, uri->port);
+
+ // Create file system instance
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+ if (fs == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to create HDFS instance to connect to %s:%d",
+ uri->hostname, uri->port);
+ }
+
+ // Clean up uri instance as we don't need it any longer
+ FreeExternalTableUri(uri);
+
+ // Check all locations to get files to fetch location.
+ ListCell *lc = NULL;
+ foreach(lc, pvalidator_data->url_list)
+ {
+ // Parse current location URI.
+ char *url = (char *)strVal(lfirst(lc));
+ Uri *uri = ParseExternalTableUri(url);
+ if (uri == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "invalid URI encountered %s", url);
+ }
+
+ //
+ // NOTICE: We temporarily support only directories as locations. We plan
+ // to extend the logic to specifying single file as one location
+ // very soon.
+
+
+ // get files contained in the path.
+ hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+ if (fiarray == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get files of path %s",
+ uri->path);
+ }
+
+ int i = 0 ;
+ // Call block location api to get data location for each file
+ for (i = 0 ; i < nsize ; i++)
+ {
+ hdfsFileInfo *fi = &fiarray[i];
+
+ // break condition of this for loop
+ if (fi == NULL) {break;}
+
+ // Build file name full path.
+ const char *fname = fi->mName;
+ char *fullpath = palloc0( // slash
+ strlen(fname) + // name
+ 1); // \0
+ sprintf(fullpath, "%s", fname);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "built full path file %s", fullpath);
+
+ // Get file full length.
+ int64_t len = fi->mSize;
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : "
+ "got file %s length " INT64_FORMAT,
+ fullpath, len);
+
+ if (len == 0) {
+ pfree(fullpath);
+ continue;
+ }
+
+ // Get block location data for this file
+ BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+ if (bla == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_blocklocation : "
+ "failed to get block location of path %s. "
+ "It is reported generally due to HDFS service errors or "
+ "another session's ongoing writing.",
+ fullpath);
+ }
+
+ // Add file full path and its block number as result.
+ blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
+ blf->file_uri = pstrdup(fullpath);
+ blf->block_num = numOfBlock;
+ blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);
+
+ elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
+ fullpath, blf->block_num);
+
+ // We don't need it any longer
+ pfree(fullpath);
+ int bidx = 0;
+ // Add block information as a list.
+ for (bidx = 0 ; bidx < blf->block_num ; bidx++)
+ {
+ BlockLocation *blo = &bla[bidx];
+ BlockLocation *bl = &(blf->locations[bidx]);
+ bl->numOfNodes = blo->numOfNodes;
+ bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+ bl->offset = blo->offset;
+ bl->length = blo->length;
+ bl->corrupt = blo->corrupt;
+
+ int nidx = 0 ;
+ for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
+ {
+ bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
+ bl->names[nidx] = pstrdup(*blo[nidx].names);
+ bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
+ }
+ }
+
+ bldata->files = lappend(bldata->files, (void *)(blf));
+
+ // Clean up block location instances created by the lib.
+ hdfsFreeFileBlockLocations(bla,numOfBlock);
+ }
+
+ // Clean up URI instance in loop as we don't need it any longer
+ FreeExternalTableUri(uri);
+
+ // Clean up file info array created by the lib for this location.
+ hdfsFreeFileInfo(fiarray,nsize);
+ }
+
+ // destroy fs instance
+ hdfsDisconnect(fs);
+
+ PG_RETURN_VOID();
+
+}
+
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
+{
+ elog(DEBUG3, "hdfsprotocol_validate() begin");
+
+ /* Check which action should perform. */
+ ExtProtocolValidatorData *pvalidator_data =
+ (ExtProtocolValidatorData *)(fcinfo->context);
+
+ if (pvalidator_data->forceCreateDir)
+ Assert(pvalidator_data->url_list && pvalidator_data->url_list->length == 1);
+
+ if (pvalidator_data->direction == EXT_VALIDATE_WRITE)
+ {
+ /* accept only one directory location */
+ if (list_length(pvalidator_data->url_list) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "only one location url is supported for writable external hdfs")));
+ }
+ }
+
+ /* Go through first round to get formatter type */
+ bool isCsv = false;
+ bool isText = false;
+ bool isOrc = false;
+ ListCell *optcell = NULL;
+ foreach(optcell, pvalidator_data->format_opts)
+ {
+ DefElem *de = (DefElem *)lfirst(optcell);
+ if (strcasecmp(de->defname, "formatter") == 0)
+ {
+ char *val = strVal(de->arg);
+ if (strcasecmp(val, "csv") == 0)
+ {
+ isCsv = true;
+ }
+ else if (strcasecmp(val, "text") == 0)
+ {
+ isText = true;
+ }
+ else if (strcasecmp(val, "orc") == 0)
+ {
+ isOrc = true;
+ }
+ }
+ }
+ /*if(1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "no formatter is supported for external hdfs")));
+ }*/
+ if (!isCsv && !isText && !isOrc)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "only 'csv', 'text' and 'orc' formatter is supported for external hdfs")));
+ }
+ Assert(isCsv || isText || isOrc);
+
+ /* Validate formatter options */
+ foreach(optcell, pvalidator_data->format_opts)
+ {
+ DefElem *de = (DefElem *)lfirst(optcell);
+ if (strcasecmp(de->defname, "delimiter") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 1. User can not specify 'OFF' in delimiter */
+ if (strcasecmp(val, "off") == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'off' value of 'delimiter' option is not supported")));
+ }
+ /* Validation 2. Can specify multibytes characters */
+ if (strlen(val) < 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'delimiter' option accepts multibytes characters")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "escape") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 3. User can not specify 'OFF' in delimiter */
+ if (strcasecmp(val, "off") == 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'off' value of 'escape' option is not supported")));
+ }
+ /* Validation 4. Can only specify one character */
+ if (strlen(val) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'escape' option accepts single character")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "newline") == 0)
+ {
+ char *val = strVal(de->arg);
+ /* Validation 5. only accept 'lf', 'cr', 'crlf' */
+ if (strcasecmp(val, "lf") != 0 &&
+ strcasecmp(val, "cr") != 0 &&
+ strcasecmp(val, "crlf") != 0)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "the value of 'newline' option can only be "
+ "'lf', 'cr' or 'crlf'")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "quote") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'quote' option is only available in 'csv' formatter")));
+ }
+
+ char *val = strVal(de->arg);
+ /* Validation 5. Can only specify one character */
+ if (strlen(val) != 1)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'quote' option accepts single character")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "force_notnull") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'force_notnull' option is only available in 'csv' formatter")));
+ }
+ }
+
+ if (strcasecmp(de->defname, "force_quote") == 0)
+ {
+ /* This is allowed only for csv mode formatter */
+ if (!isCsv)
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("hdfsprotocol_validate : "
+ "'force_quote' option is only available in 'csv' formatter")));
+ }
+ }
+ }
+
+ /* All urls should
+ * 1) have the same protocol name 'hdfs',
+ * 2) the same hdfs namenode server address
+ */
+ /* Check all locations to get files to fetch location. */
+ char *nnaddr = NULL;
+ int nnport = -1;
+ ListCell *lc = NULL;
+ foreach(lc, pvalidator_data->url_list)
+ {
+ /* Parse current location URI. */
+ char *url = (char *)strVal(lfirst(lc));
+ Uri *uri = ParseExternalTableUri(url);
+ if (uri == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "invalid URI encountered %s", url);
+ }
+
+ if (uri->protocol != URI_HDFS)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "invalid URI protocol encountered in %s, "
+ "hdfs:// protocol is required",
+ url);
+ }
+
+ if (nnaddr == NULL)
+ {
+ nnaddr = pstrdup(uri->hostname);
+ nnport = uri->port;
+ }
+ else
+ {
+ if (strcmp(nnaddr, uri->hostname) != 0)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "different name server addresses are detected, "
+ "both %s and %s are found",
+ nnaddr, uri->hostname);
+ }
+ if (nnport != uri->port)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "different name server ports are detected, "
+ "both %d and %d are found",
+ nnport, uri->port);
+ }
+ }
+
+ /* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */
+ if (pvalidator_data->forceCreateDir) {
+
+ elog(LOG, "hdfs_validator() forced creating dir");
+
+ /* Create file system instance */
+ hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+ if (fs == NULL)
+ {
+ elog(ERROR, "hdfsprotocol_validate : "
+ "failed to create HDFS instance to connect to %s:%d",
+ uri->hostname, uri->port);
+ }
+
+ if (hdfsExists(fs, uri->path) == -1)
+ elog(ERROR, "hdfsprotocol_validate : "
+ "Location \"%s\" is not exist",
+ uri->path);
+
+ /* destroy fs instance */
+ hdfsDisconnect(fs);
+ }
+
+ /* Clean up temporarily created instances */
+ FreeExternalTableUri(uri);
+ if (nnaddr != NULL)
+ {
+ pfree(nnaddr);
+ }
+ }
+
+ elog(LOG, "passed validating hdfs protocol options");
+
+ /**************************************************************************
+ * This is a bad implementation that we check formatter options here. Should
+ * be moved to call formatter specific validation UDFs.
+ **************************************************************************/
+
+ PG_RETURN_VOID();
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/extprotocol/gpextprotocol.c
----------------------------------------------------------------------
diff --git a/contrib/extprotocol/gpextprotocol.c b/contrib/extprotocol/gpextprotocol.c
index 419b923..07f1731 100644
--- a/contrib/extprotocol/gpextprotocol.c
+++ b/contrib/extprotocol/gpextprotocol.c
@@ -324,5 +324,5 @@ void FreeDemoUri(DemoUri *uri)
if (uri->protocol)
pfree(uri->protocol);
- pfree(uri);
+ FreeExternalTableUri(uri);
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 692a5db..0d2527d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -86,14 +86,17 @@ static void InitParseState(CopyState pstate, Relation relation,
char *uri, int rejectlimit,
bool islimitinrows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding);
-static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
- int nArgs,
- CopyState pstate,
- FormatterData* formatter,
- Relation rel,
- TupleDesc tupDesc,
- FmgrInfo *convFuncs,
- Oid *typioparams);
+static void
+FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
+ int nArgs,
+ CopyState pstate,
+ FormatterData *formatter,
+ Relation rel,
+ TupleDesc tupDesc,
+ FmgrInfo *convFuncs,
+ Oid *typioparams,
+ char *url,
+ ScanState *ss);
static void open_external_readable_source(FileScanDesc scan);
static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
@@ -305,12 +308,54 @@ external_beginscan(ExternalScan *extScan,
scan->errcontext.previous = error_context_stack;
//pgstat_initstats(relation);
-
+ external_populate_formatter_actionmask(scan->fs_pstate, scan->fs_formatter);
return scan;
}
/* ----------------
+ * external_populate_formatter_actionmask
+ * ----------------
+ */
+void external_populate_formatter_actionmask(CopyState pstate,
+ FormatterData *formatter)
+{
+ /* We just call the formatter in function to populate the mask */
+ formatter->fmt_mask = FMT_UNSET;
+
+ if (pstate->custom_formatter_func == NULL)
+ {
+ formatter->fmt_mask |= FMT_NEEDEXTBUFF;
+ elog(LOG, "external scan needs an external protocol to cooperate");
+ return;
+ }
+
+ Datum d;
+ FunctionCallInfoData fcinfo;
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL);
+ d = FunctionCallInvoke(&fcinfo);
+
+ if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ elog(LOG, "external scan needs an external protocol to cooperate");
+ }
+ else
+ {
+ elog(LOG, "external scan needs only formatter to manipulate data");
+ }
+}
+
+/* ----------------
* external_rescan - (re)start a scan of an external file
* ----------------
*/
@@ -525,13 +570,15 @@ external_getnext(FileScanDesc scan,
/*
* open the external source (local file or http).
*
- * NOTE: external_beginscan() seems like the natural place for this call. However,
- * in queries with more than one gang each gang will initialized all the nodes
- * of the plan (but actually executed only the nodes in it's local slice)
- * This means that external_beginscan() (and external_endscan() too) will get called
- * more than needed and we'll end up opening too many http connections when
- * they are not expected (see MPP-1261). Therefore we instead do it here on the
- * first time around only.
+ * NOTE: external_beginscan() seems like the natural place for this call.
+ * However, in queries with more than one gang each gang will initialized
+ * all the nodes of the plan (but actually executed only the nodes in it's
+ * local slice)
+ *
+ * This means that external_beginscan() (and external_endscan() too) will
+ * get called more than needed and we'll end up opening too many http
+ * connections when they are not expected (see MPP-1261). Therefore we
+ * instead do it here on the first time around only.
*/
/*
@@ -539,7 +586,7 @@ external_getnext(FileScanDesc scan,
* load external protocol.
*/
- if (scan->fs_file == NULL)
+ if (scan->fs_file == NULL && (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF))
open_external_readable_source(scan);
/* Note: no locking manipulations needed */
@@ -567,6 +614,7 @@ external_getnext(FileScanDesc scan,
return true;
}
+
/*
* external_insert_init
*
@@ -575,7 +623,7 @@ external_getnext(FileScanDesc scan,
*/
ExternalInsertDesc
external_insert_init(Relation rel, int errAosegno,
- ExternalTableType formatterType, char *formatterName)
+ int formatterType, char *formatterName, PlannedStmt* plannedstmt)
{
ExternalInsertDesc extInsertDesc;
ExtTableEntry* extentry;
@@ -634,8 +682,12 @@ external_insert_init(Relation rel, int errAosegno,
/* get a url to use. we use seg number modulo total num of urls */
v = list_nth(extentry->locations, my_url);
uri_str = pstrdup(v->val.str);
+ Uri* uri = ParseExternalTableUri(uri_str);
+
extInsertDesc->ext_uri = uri_str;
+ FreeExternalTableUri(uri);
+
/*elog(NOTICE, "seg %d got url number %d: %s", segindex, my_url, uri_str);*/
}
@@ -668,6 +720,10 @@ external_insert_init(Relation rel, int errAosegno,
{
extInsertDesc->ext_formatter_data = (FormatterData *) palloc0 (sizeof(FormatterData));
extInsertDesc->ext_formatter_data->fmt_perrow_ctx = extInsertDesc->ext_pstate->rowcontext;
+
+ /* First call formatter in function to get action mask */
+ external_populate_formatter_actionmask(extInsertDesc->ext_pstate,
+ extInsertDesc->ext_formatter_data);
}
return extInsertDesc;
@@ -687,7 +743,6 @@ external_insert_init(Relation rel, int errAosegno,
Oid
external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
{
-
HeapTuple instup = ExecFetchSlotHeapTuple(tupTableSlot);
TupleDesc tupDesc = extInsertDesc->ext_tupDesc;
Datum* values = extInsertDesc->ext_values;
@@ -695,14 +750,18 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
CopyStateData* pstate = extInsertDesc->ext_pstate;
bool customFormat = extInsertDesc->ext_pstate->custom;
-
if(extInsertDesc->ext_noop)
return InvalidOid;
/* Open our output file or output stream if not yet open */
- if(!extInsertDesc->ext_file && !extInsertDesc->ext_noop)
+ if(!extInsertDesc->ext_file &&
+ !extInsertDesc->ext_noop &&
+ (extInsertDesc->ext_formatter_data == NULL ||
+ (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF)))
+ {
open_external_writable_source(extInsertDesc);
+ }
/*
* deconstruct the tuple and format it into text
@@ -730,15 +789,34 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
/* must have been created during insert_init */
Assert(formatter);
- /* per call formatter prep */
- FunctionCallPrepareFormatter(&fcinfo,
- 1,
- pstate,
- formatter,
- extInsertDesc->ext_rel,
- extInsertDesc->ext_tupDesc,
- pstate->out_functions,
- NULL);
+ if ((formatter->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+ {
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ extInsertDesc->ext_rel,
+ extInsertDesc->ext_tupDesc,
+ pstate->out_functions,
+ NULL,
+ extInsertDesc->ext_uri,
+ NULL);
+ }
+ else
+ {
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 1,
+ pstate,
+ formatter,
+ extInsertDesc->ext_rel,
+ extInsertDesc->ext_tupDesc,
+ pstate->out_functions,
+ NULL,
+ NULL,
+ NULL);
+ }
/* Mark the correct record type in the passed tuple */
HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
@@ -748,18 +826,22 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
fcinfo.argnull[0] = false;
d = FunctionCallInvoke(&fcinfo);
+
MemoryContextReset(formatter->fmt_perrow_ctx);
- /* We do not expect a null result */
- if (fcinfo.isnull)
- elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+ if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ /* We do not expect a null result */
+ if (fcinfo.isnull)
+ elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
- b = DatumGetByteaP(d);
+ b = DatumGetByteaP(d);
- CopyOneCustomRowTo(pstate, b);
+ CopyOneCustomRowTo(pstate, b);
+ }
}
- if (extInsertDesc->ext_formatter_data == NULL)
+ if (extInsertDesc->ext_formatter_data == NULL || (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF))
{
/* Write the data into the external source */
external_senddata((URL_FILE*)extInsertDesc->ext_file, pstate);
@@ -769,7 +851,6 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
pstate->fe_msgbuf->data[0] = '\0';
}
pstate->processed++;
-
return HeapTupleGetOid(instup);
}
@@ -782,6 +863,28 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
void
external_insert_finish(ExternalInsertDesc extInsertDesc)
{
+ /* Tell formatter to close */
+ if (extInsertDesc->ext_formatter_data != NULL &&
+ (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+ {
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ extInsertDesc->ext_formatter_data->fmt_mask |= FMT_WRITE_END;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ extInsertDesc->ext_pstate,
+ extInsertDesc->ext_formatter_data,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL,
+ NULL);
+ d = FunctionCallInvoke(&fcinfo);
+ }
/*
* Close the external source
@@ -805,6 +908,7 @@ external_insert_finish(ExternalInsertDesc extInsertDesc)
pfree(extInsertDesc);
}
+
/* ==========================================================================
* The follwing macros aid in major refactoring of data processing code (in
* externalgettup() ). We use macros because in some cases the code must be in
@@ -1042,102 +1146,102 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
static HeapTuple
externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
- HeapTuple tuple = NULL;
- CopyState pstate = scan->fs_pstate;
- bool needData = false;
+ HeapTuple tuple = NULL;
+ CopyState pstate = scan->fs_pstate;
+ bool needData = false;
- /* If we either got things to read or stuff to process */
- while (!pstate->fe_eof || !pstate->raw_buf_done)
+ /* If we either got things to read or stuff to process */
+ while (!pstate->fe_eof || !pstate->raw_buf_done)
+ {
+ /* need to fill our buffer with data? */
+ if (pstate->raw_buf_done)
{
- /* need to fill our buffer with data? */
- if (pstate->raw_buf_done)
- {
- pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
- pstate->begloc = pstate->raw_buf;
- pstate->raw_buf_done = (pstate->bytesread==0);
- pstate->raw_buf_index = 0;
+ pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+ pstate->begloc = pstate->raw_buf;
+ pstate->raw_buf_done = (pstate->bytesread==0);
+ pstate->raw_buf_index = 0;
- /* on first time around just throw the header line away */
- if (pstate->header_line && pstate->bytesread > 0)
+ /* on first time around just throw the header line away */
+ if (pstate->header_line && pstate->bytesread > 0)
+ {
+ PG_TRY();
+ {
+ readHeaderLine(pstate);
+ }
+ PG_CATCH();
{
- PG_TRY();
+ /*
+ * got here? encoding conversion error occurred on the
+ * header line (first row).
+ */
+ if (pstate->errMode == ALL_OR_NOTHING)
{
- readHeaderLine(pstate);
+ PG_RE_THROW();
}
- PG_CATCH();
+ else
{
+ /* SREH - release error state */
+ if (!elog_dismiss(DEBUG5))
+ PG_RE_THROW(); /* hope to never get here! */
+
/*
- * got here? encoding conversion error occurred on the
- * header line (first row).
+ * note: we don't bother doing anything special here.
+ * we are never interested in logging a header line
+ * error. just continue the workflow.
*/
- if (pstate->errMode == ALL_OR_NOTHING)
- {
- PG_RE_THROW();
- }
- else
- {
- /* SREH - release error state */
- if (!elog_dismiss(DEBUG5))
- PG_RE_THROW(); /* hope to never get here! */
-
- /*
- * note: we don't bother doing anything special here.
- * we are never interested in logging a header line
- * error. just continue the workflow.
- */
- }
}
- PG_END_TRY();
-
- EXT_RESET_LINEBUF;
- pstate->header_line = false;
}
+ PG_END_TRY();
+
+ EXT_RESET_LINEBUF;
+ pstate->header_line = false;
}
+ }
- /* while there is still data in our buffer */
- while (!pstate->raw_buf_done || needData)
- {
- DataLineStatus ret_mode = parse_next_line(scan);
+ /* while there is still data in our buffer */
+ while (!pstate->raw_buf_done || needData)
+ {
+ DataLineStatus ret_mode = parse_next_line(scan);
- if(ret_mode == LINE_OK)
- {
- /* convert to heap tuple */
- /* XXX This is bad code. Planner should be able to
- * decide whether we need heaptuple or memtuple upstream,
- * so make the right decision here.
- */
- tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
- pstate->processed++;
- MemoryContextReset(pstate->rowcontext);
- return tuple;
- }
- else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
- {
- /* error was handled in parse_next_line. move to the next */
- continue;
- }
- else if(ret_mode == END_MARKER)
- {
- scan->fs_inited = false;
- return NULL;
- }
- else
- {
- /* try to get more data if possible */
- Assert((ret_mode == NEED_MORE_DATA) ||
- (ret_mode == LINE_ERROR && pstate->raw_buf_done));
- needData = true;
- break;
- }
+ if(ret_mode == LINE_OK)
+ {
+ /* convert to heap tuple */
+ /* XXX This is bad code. Planner should be able to
+ * decide whether we need heaptuple or memtuple upstream,
+ * so make the right decision here.
+ */
+ tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
+ pstate->processed++;
+ MemoryContextReset(pstate->rowcontext);
+ return tuple;
+ }
+ else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
+ {
+ /* error was handled in parse_next_line. move to the next */
+ continue;
+ }
+ else if(ret_mode == END_MARKER)
+ {
+ scan->fs_inited = false;
+ return NULL;
+ }
+ else
+ {
+ /* try to get more data if possible */
+ Assert((ret_mode == NEED_MORE_DATA) ||
+ (ret_mode == LINE_ERROR && pstate->raw_buf_done));
+ needData = true;
+ break;
}
}
+ }
- /*
- * if we got here we finished reading all the data.
- */
- scan->fs_inited = false;
+ /*
+ * if we got here we finished reading all the data.
+ */
+ scan->fs_inited = false;
- return NULL;
+ return NULL;
}
@@ -1145,149 +1249,241 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss
static HeapTuple
externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
- HeapTuple tuple;
- CopyState pstate = scan->fs_pstate;
- FormatterData* formatter = scan->fs_formatter;
- bool no_more_data = false;
- MemoryContext oldctxt = CurrentMemoryContext;
+ HeapTuple tuple;
+ CopyState pstate = scan->fs_pstate;
+ FormatterData* formatter = scan->fs_formatter;
+ bool no_more_data = false;
+ MemoryContext oldctxt = CurrentMemoryContext;
- Assert(formatter);
+ Assert(formatter);
- /* while didn't finish processing the entire file */
- while (!no_more_data)
+ /* while didn't finish processing the entire file */
+ while (!no_more_data)
+ {
+ /* need to fill our buffer with data? */
+ if (pstate->raw_buf_done)
{
- /* need to fill our buffer with data? */
- if (pstate->raw_buf_done)
- {
- int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
- if ( bytesread > 0 )
- appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
- pstate->raw_buf_done = false;
-
- /* HEADER not yet supported ... */
- if(pstate->header_line)
- elog(ERROR, "header line in custom format is not yet supported");
- }
+ int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+ if ( bytesread > 0 )
+ appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
+ pstate->raw_buf_done = false;
+
+ /* HEADER not yet supported ... */
+ if(pstate->header_line)
+ elog(ERROR, "header line in custom format is not yet supported");
+ }
- if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+ if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+ {
+ /* while there is still data in our buffer */
+ while (!pstate->raw_buf_done)
{
- /* while there is still data in our buffer */
- while (!pstate->raw_buf_done)
+ bool error_caught = false;
+
+ /*
+ * Invoke the custom formatter function.
+ */
+ PG_TRY();
{
- bool error_caught = false;
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ scan->fs_rd,
+ scan->fs_tupDesc,
+ scan->in_functions,
+ scan->typioparams,
+ ((URL_FILE *)(scan->fs_file))->url,
+ ss);
+ d = FunctionCallInvoke(&fcinfo);
+
+ }
+ PG_CATCH();
+ {
+ error_caught = true;
+
+ MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
/*
- * Invoke the custom formatter function.
+ * Save any bad row information that was set
+ * by the user in the formatter UDF (if any).
+ * Then handle the error in FILEAM_HANDLE_ERROR.
*/
- PG_TRY();
- {
- Datum d;
- FunctionCallInfoData fcinfo;
-
- /* per call formatter prep */
- FunctionCallPrepareFormatter(&fcinfo,
- 0,
- pstate,
- formatter,
- scan->fs_rd,
- scan->fs_tupDesc,
- scan->in_functions,
- scan->typioparams);
- d = FunctionCallInvoke(&fcinfo);
+ pstate->cur_lineno = formatter->fmt_badrow_num;
+ pstate->cur_byteno = formatter->fmt_bytesread;
+ resetStringInfo(&pstate->line_buf);
- }
- PG_CATCH();
+ if (formatter->fmt_badrow_len > 0)
{
- error_caught = true;
-
- MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
-
- /*
- * Save any bad row information that was set
- * by the user in the formatter UDF (if any).
- * Then handle the error in FILEAM_HANDLE_ERROR.
- */
- pstate->cur_lineno = formatter->fmt_badrow_num;
- pstate->cur_byteno = formatter->fmt_bytesread;
- resetStringInfo(&pstate->line_buf);
-
- if (formatter->fmt_badrow_len > 0)
+ if (formatter->fmt_badrow_data)
+ appendBinaryStringInfo(&pstate->line_buf,
+ formatter->fmt_badrow_data,
+ formatter->fmt_badrow_len);
+
+ formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
+ if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
+ formatter->fmt_databuf.cursor < 0 )
{
- if (formatter->fmt_badrow_data)
- appendBinaryStringInfo(&pstate->line_buf,
- formatter->fmt_badrow_data,
- formatter->fmt_badrow_len);
-
- formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
- if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
- formatter->fmt_databuf.cursor < 0 )
- {
- formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
- }
+ formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
}
-
- FILEAM_HANDLE_ERROR;
-
- MemoryContextSwitchTo(oldctxt);
}
- PG_END_TRY();
- /*
- * Examine the function results. If an error was caught
- * we already handled it, so after checking the reject
- * limit, loop again and call the UDF for the next tuple.
- */
- if (!error_caught)
+ FILEAM_HANDLE_ERROR;
+
+ MemoryContextSwitchTo(oldctxt);
+ }
+ PG_END_TRY();
+
+ /*
+ * Examine the function results. If an error was caught
+ * we already handled it, so after checking the reject
+ * limit, loop again and call the UDF for the next tuple.
+ */
+ if (!error_caught)
+ {
+ switch (formatter->fmt_notification)
{
- switch (formatter->fmt_notification)
- {
- case FMT_NONE:
+ case FMT_NONE:
- /* got a tuple back */
+ /* got a tuple back */
- tuple = formatter->fmt_tuple;
- pstate->processed++;
- MemoryContextReset(formatter->fmt_perrow_ctx);
+ tuple = formatter->fmt_tuple;
+ pstate->processed++;
+ MemoryContextReset(formatter->fmt_perrow_ctx);
- return tuple;
+ return tuple;
- case FMT_NEED_MORE_DATA:
+ case FMT_NEED_MORE_DATA:
- /*
- * Callee consumed all data in the buffer.
- * Prepare to read more data into it.
- */
- pstate->raw_buf_done = true;
- justifyDatabuf(&formatter->fmt_databuf);
+ /*
+ * Callee consumed all data in the buffer.
+ * Prepare to read more data into it.
+ */
+ pstate->raw_buf_done = true;
+ justifyDatabuf(&formatter->fmt_databuf);
- continue;
+ continue;
- default:
- elog(ERROR, "unsupported formatter notification (%d)",
- formatter->fmt_notification);
- break;
- }
- }
- else
- {
- FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+ default:
+ elog(ERROR, "unsupported formatter notification (%d)",
+ formatter->fmt_notification);
+ break;
}
-
}
- }
- else
- {
- no_more_data = true;
+ else
+ {
+ FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+ }
}
}
+ else
+ {
+ no_more_data = true;
+ }
+ }
+
+ /*
+ * if we got here we finished reading all the data.
+ */
+ Assert(no_more_data);
+ scan->fs_inited = false;
+
+ return NULL;
+}
+
+static HeapTuple
+externalgettup_custom_noextprot(FileScanDesc scan,
+ ExternalSelectDesc desc,
+ ScanState *ss)
+{
+ HeapTuple tuple;
+ CopyState pstate = scan->fs_pstate;
+ FormatterData* formatter = scan->fs_formatter;
+ bool no_more_data = false;
+ MemoryContext oldctxt = CurrentMemoryContext;
+
+ Assert(formatter);
+
+ /* while didn't finish processing the entire file */
+ while (!no_more_data)
+ {
+ bool error_caught = false;
/*
- * if we got here we finished reading all the data.
+ * Invoke the custom formatter function.
*/
- Assert(no_more_data);
- scan->fs_inited = false;
+ PG_TRY();
+ {
+ Datum d;
+ FunctionCallInfoData fcinfo;
+
+ /* per call formatter prep */
+ FunctionCallPrepareFormatter(&fcinfo,
+ 0,
+ pstate,
+ formatter,
+ scan->fs_rd,
+ scan->fs_tupDesc,
+ scan->in_functions,
+ scan->typioparams,
+ scan->fs_uri,
+ ss);
+ d = FunctionCallInvoke(&fcinfo);
- return NULL;
+ }
+ PG_CATCH();
+ {
+ error_caught = true;
+ MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+ FILEAM_HANDLE_ERROR;
+ MemoryContextSwitchTo(oldctxt);
+ }
+ PG_END_TRY();
+
+ if (!error_caught)
+ {
+ switch (formatter->fmt_notification)
+ {
+ case FMT_NONE:
+ {
+ /* got a tuple back */
+ tuple = formatter->fmt_tuple;
+ pstate->processed++;
+ MemoryContextReset(formatter->fmt_perrow_ctx);
+ return tuple;
+ }
+ case FMT_DONE:
+ {
+ no_more_data = true;
+ break;
+ }
+ default:
+ {
+ elog(ERROR, "unsupported formatter notification (%d)",
+ formatter->fmt_notification);
+ break;
+ }
+ }
+ }
+ else
+ {
+ ereport(ERROR,
+ (errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
+ (errmsg("formatter reported error")),
+ errOmitLocation(true)));
+ }
+ }
+
+ /*
+ * if we got here we finished reading all the data.
+ */
+ Assert(no_more_data);
+ scan->fs_inited = false;
+ return NULL;
}
/* ----------------
@@ -1322,11 +1518,19 @@ externalgettup(FileScanDesc scan,
/* (set current state...) */
}
+ /***********************************************************
+ * This version has always custom formatter and fs defined.
+ ***********************************************************/
if (!custom)
- return externalgettup_defined(scan, desc, ss); /* text/csv */
+ return externalgettup_defined(scan, desc, ss); // text/csv
+ else if (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF)
+ {
+ return externalgettup_custom(scan, desc, ss);
+ }
else
- return externalgettup_custom(scan, desc, ss); /* custom */
-
+ {
+ return externalgettup_custom_noextprot(scan, desc, ss);
+ }
}
/*
* setCustomFormatter
@@ -1346,7 +1550,23 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
Oid argList[1];
Oid returnOid;
- funcname = lappend(funcname, makeString(formatter_name));
+ char* new_formatter_name = (char *)palloc0(strlen(formatter_name) + 5);
+ if (!iswritable)
+ {
+ sprintf(new_formatter_name, "%s_in", formatter_name);
+ }
+ else
+ {
+ sprintf(new_formatter_name, "%s_out", formatter_name);
+ }
+
+ /* update to all lowercase string */
+ for ( int i = 0 ; new_formatter_name[i] != '\0' ; ++i )
+ {
+ new_formatter_name[i] = tolower(new_formatter_name[i]);
+ }
+
+ funcname = lappend(funcname, makeString(new_formatter_name));
if(iswritable)
{
@@ -1363,7 +1583,7 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
if (!OidIsValid(procOid))
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
errmsg("formatter function %s of type %s was not found.",
- formatter_name,
+ new_formatter_name,
(iswritable ? "writable" : "readable")),
errhint("Create it with CREATE FUNCTION."),
errOmitLocation(true)));
@@ -1372,7 +1592,7 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
if (get_func_rettype(procOid) != returnOid)
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("formatter function %s of type %s has an incorrect return type",
- formatter_name,
+ new_formatter_name,
(iswritable ? "writable" : "readable")),
errOmitLocation(true)));
@@ -1381,9 +1601,12 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
ereport(ERROR,
(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
errmsg("formatter function %s is not declared STABLE.",
- formatter_name),
+ new_formatter_name),
errOmitLocation(true)));
+ if(NULL != new_formatter_name)
+ pfree(new_formatter_name);
+
return procOid;
}
@@ -1664,7 +1887,9 @@ FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
Relation rel,
TupleDesc tupDesc,
FmgrInfo *convFuncs,
- Oid *typioparams)
+ Oid *typioparams,
+ char *url,
+ ScanState *ss)
{
formatter->type = T_FormatterData;
formatter->fmt_relation = rel;
@@ -1680,6 +1905,8 @@ FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
formatter->fmt_needs_transcoding = pstate->need_transcoding;
formatter->fmt_conversion_proc = pstate->enc_conversion_proc;
formatter->fmt_external_encoding = pstate->client_encoding;
+ formatter->fmt_url = url;
+ formatter->fmt_splits = ss == NULL ? NULL : ss->splits;
InitFunctionCallInfoData(/* FunctionCallInfoData */ *fcinfo,
/* FmgrInfo */ pstate->custom_formatter_func,
@@ -1688,7 +1915,6 @@ FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
/* ResultSetInfo */ NULL);
}
-
/*
* open the external source for scanning (RET only)
*
@@ -2360,6 +2586,48 @@ strtokx2(const char *s,
return start;
}
+bool hasErrTblInFmtOpts(List *fmtOpts) {
+ char *format_str = pstrdup((char *) strVal(linitial(fmtOpts)));
+ const char *whitespace = " \t\n\r";
+ int encoding = GetDatabaseEncoding();
+ char *key = strtokx2(format_str, whitespace, NULL, NULL,
+ 0, false, true, encoding);
+ while (key) {
+ if (pg_strcasecmp(key, "err_table") == 0)
+ return true;
+ key = strtokx2(NULL, whitespace, NULL, NULL,
+ 0, false, false, encoding);
+ }
+ return false;
+}
+
+char *getExtTblCategoryInFmtOptsStr(char *fmtStr)
+{
+ const char *whitespace = " \t\n\r";
+ const char *quote = "'";
+ int encoding = GetDatabaseEncoding();
+
+ char *key = strtokx2(fmtStr, whitespace, NULL, NULL,
+ 0, false, true, encoding);
+ char *val = strtokx2(NULL, whitespace, NULL, quote,
+ 0, false, true, encoding);
+
+ while (key && val)
+ {
+ if (pg_strncasecmp(key, "category", strlen("category")) == 0)
+ {
+ return pstrdup(val);
+ }
+
+ key = strtokx2(NULL, whitespace, NULL, NULL,
+ 0, false, false, encoding);
+ val = strtokx2(NULL, whitespace, NULL, quote,
+ 0, false, true, encoding);
+ }
+
+ return NULL;
+}
+
char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr)
{
const char *whitespace = " \t\n\r";
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/access/external/plugstorage.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/plugstorage.c b/src/backend/access/external/plugstorage.c
index ad7d260..fe7c82a 100644
--- a/src/backend/access/external/plugstorage.c
+++ b/src/backend/access/external/plugstorage.c
@@ -479,24 +479,33 @@ void InvokePlugStorageFormatStopScan(FmgrInfo *func,
ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
Relation relation,
int formatterType,
- char *formatterName)
+ char *formatterName,
+ PlannedStmt *plannedstmt,
+ int segno )
{
PlugStorageData psdata;
FunctionCallInfoData fcinfo;
- psdata.type = T_PlugStorageData;
- psdata.ps_relation = relation;
- psdata.ps_formatter_type = formatterType;
- psdata.ps_formatter_name = formatterName;
+ psdata.type = T_PlugStorageData;
+ psdata.ps_relation = relation;
+ psdata.ps_formatter_type = formatterType;
+ psdata.ps_formatter_name = formatterName;
+ psdata.ps_segno = segno;
- InitFunctionCallInfoData(fcinfo,
- func,
- 0,
- (Node *)(&psdata),
- NULL);
+ psdata.ps_scan_state = palloc0(sizeof(ScanState));
+
+ InitFunctionCallInfoData(fcinfo, // FunctionCallInfoData
+ func, // FmgrInfo
+ 0, // nArgs
+ (Node *)(&psdata), // Call Context
+ NULL); // ResultSetInfo
+
+ // Invoke function
FunctionCallInvoke(&fcinfo);
+
+ // We do not expect a null result
if (fcinfo.isnull)
{
elog(ERROR, "function %u returned NULL",
@@ -505,6 +514,7 @@ ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
ExternalInsertDesc extInsertDesc = psdata.ps_ext_insert_desc;
+ pfree(psdata.ps_scan_state);
return extInsertDesc;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/catalog/cdb_external_extensions.sql
----------------------------------------------------------------------
diff --git a/src/backend/catalog/cdb_external_extensions.sql b/src/backend/catalog/cdb_external_extensions.sql
index e3a8080..d11797f 100644
--- a/src/backend/catalog/cdb_external_extensions.sql
+++ b/src/backend/catalog/cdb_external_extensions.sql
@@ -47,3 +47,15 @@ LANGUAGE C STABLE;
CREATE OR REPLACE FUNCTION fixedwidth_out(record) RETURNS bytea
AS '$libdir/fixedwidth.so', 'fixedwidth_out'
LANGUAGE C STABLE;
+
+------------------------------------------------------------------
+-- external HDFS
+------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION hdfs_validate() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_validate'
+LANGUAGE C STABLE;
+
+CREATE OR REPLACE FUNCTION hdfs_blocklocation() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_blocklocation'
+LANGUAGE C STABLE;
+
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/catalog/heap.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 18a8acf..e159c83 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -47,7 +47,11 @@
*-------------------------------------------------------------------------
*/
#include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "port.h"
+#include "access/fileam.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/sysattr.h"
@@ -78,38 +82,42 @@
#include "catalog/pg_statistic.h"
#include "catalog/pg_tablespace.h"
#include "catalog/pg_type.h"
-#include "cdb/cdbappendonlyam.h"
-#include "cdb/cdbpartition.h"
+
#include "cdb/cdbanalyze.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbmirroredfilesysobj.h"
#include "cdb/cdbparquetfooterprocessor.h"
+#include "cdb/cdbparquetstoragewrite.h"
+#include "cdb/cdbpartition.h"
+#include "cdb/cdbpersistentfilesysobj.h"
+#include "cdb/cdbsharedstorageop.h"
+#include "cdb/cdbvars.h"
#include "commands/dbcommands.h"
#include "commands/tablecmds.h"
#include "commands/tablespace.h"
-#include "miscadmin.h"
+
#include "nodes/makefuncs.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
#include "optimizer/clauses.h"
#include "optimizer/var.h"
#include "parser/parse_coerce.h"
#include "parser/parse_expr.h"
#include "parser/parse_relation.h"
+#include "pg_config_manual.h"
+#include "storage/fd.h"
#include "storage/smgr.h"
+#include "utils/acl.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
+#include "utils/guc.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h" /* CDB: GetMemoryChunkContext */
+#include "utils/palloc.h"
#include "utils/relcache.h"
#include "utils/syscache.h"
-#include "utils/guc.h"
-#include "cdb/cdbvars.h"
-
-#include "cdb/cdbsharedstorageop.h"
-#include "cdb/cdbmirroredfilesysobj.h"
-#include "cdb/cdbpersistentfilesysobj.h"
-#include "cdb/cdbparquetstoragewrite.h"
-#include "catalog/gp_persistent.h"
-
-#include "utils/guc.h"
+#include "utils/uri.h"
typedef struct pg_result PGresult;
extern void PQclear(PGresult *res);
@@ -2544,7 +2552,52 @@ heap_drop_with_catalog(Oid relid)
* External table? If so, delete the pg_exttable tuple.
*/
if (is_external_rel)
+ {
+ /* Step 1. remove uri on file system */
+ rel = relation_open(relid, AccessExclusiveLock);
+ ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
+ char *path = (char *) strVal(linitial(exttbl->locations));
+ char *searchKey = (char *) palloc0 (MAXPGPATH);
+ char *fileSpacePath = NULL;
+ GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+ &fileSpacePath);
+ sprintf(searchKey, "%s/ExtErrTbl/",fileSpacePath);
+ char *match = strstr(path,searchKey);
+ if (match)
+ {
+ RemovePath(path, 1);
+ }
+
+ /* Get category for the external table */
+ List *entry_locations = exttbl->locations;
+ Assert(entry_locations);
+ ListCell *entry_location = list_head(entry_locations);
+ char *url = ((Value*)lfirst(entry_location))->val.str;
+ char *category = getExtTblCategoryInFmtOptsStr(exttbl->fmtopts);
+
+ /* Remove data for internal table */
+ if (category != NULL &&
+ pg_strncasecmp(category, "internal", strlen("internal")) == 0)
+ {
+
+
+ if (IS_HDFS_URI(url)) /* ORC, TEXT, CSV */
+ {
+ // orc, text, csv only support one location.
+ Assert(list_length(entry_locations) == 1);
+ RemovePath(url, 1);
+ }
+ }
+
+ if (category)
+ {
+ pfree(category);
+ }
+ relation_close(rel, AccessExclusiveLock);
+
+ /* Step 2. remove pg_exttable entry */
RemoveExtTableEntry(relid);
+ }
if (is_foreign_rel)
RemoveForeignTableEntry(relid);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index b451f68..f7d4472 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -29,14 +29,17 @@
#include "access/genam.h"
#include "access/aomd.h"
+#include "access/extprotocol.h"
#include "access/heapam.h"
#include "access/filesplit.h"
#include "access/parquetsegfiles.h"
+#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/catquery.h"
#include "catalog/pg_exttable.h"
#include "catalog/pg_inherits.h"
#include "catalog/pg_proc.h"
+#include "catalog/pg_extprotocol.h"
#include "cdb/cdbdatalocality.h"
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
@@ -52,6 +55,7 @@
#include "optimizer/planmain.h"
#include "parser/parsetree.h"
#include "storage/fd.h"
+#include "parser/parse_func.h"
#include "postmaster/identity.h"
#include "cdb/cdbmetadatacache.h"
#include "resourcemanager/utils/network_utils.h"
@@ -87,6 +91,11 @@ typedef struct range_table_collector_context {
List *full_range_tables; /* every table include result relation */
} range_table_collector_context;
+typedef struct collect_scan_rangetable_context {
+ plan_tree_base_prefix base;
+ List *range_tables; // range table for scan only
+ List *full_range_tables; // full range table
+} collect_scan_rangetable_context;
/*
* structure containing information about how much a
* host holds.
@@ -123,10 +132,14 @@ typedef struct File_Split {
int64 logiceof;
int host;
bool is_local_read;
+ char *ext_file_uri;
} File_Split;
typedef enum DATALOCALITY_RELATION_TYPE {
- DATALOCALITY_APPENDONLY, DATALOCALITY_PARQUET, DATALOCALITY_UNKNOWN
+ DATALOCALITY_APPENDONLY,
+ DATALOCALITY_PARQUET,
+ DATALOCALITY_HDFS,
+ DATALOCALITY_UNKNOWN
} DATALOCALITY_RELATION_TYPE;
/*
@@ -290,6 +303,7 @@ typedef struct hostname_volume_stat_context {
*/
typedef struct split_to_segment_mapping_context {
range_table_collector_context rtc_context;
+ collect_scan_rangetable_context srtc_context;
data_dist_stat_context dds_context;
collect_hdfs_split_location_context chsl_context;
hostname_volume_stat_context host_context;
@@ -315,9 +329,9 @@ typedef struct split_to_segment_mapping_context {
int64 total_metadata_logic_len;
- int metadata_cache_time_us;
- int alloc_resource_time_us;
- int cal_datalocality_time_us;
+ int metadata_cache_time_us;
+ int alloc_resource_time_us;
+ int cal_datalocality_time_us;
} split_to_segment_mapping_context;
typedef struct vseg_list{
@@ -331,14 +345,19 @@ static void init_datalocality_memory_context(void);
static void init_split_assignment_result(Split_Assignment_Result *result,
int host_num);
-static void init_datalocality_context(split_to_segment_mapping_context *context);
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+ split_to_segment_mapping_context *context);
static bool range_table_collector_walker(Node *node,
range_table_collector_context *context);
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
range_table_collector_context *context);
+static bool collect_scan_rangetable(Node *node,
+ collect_scan_rangetable_context *cxt);
+
+
static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isUDFExists,
MemoryContext my_memorycontext);
@@ -349,6 +368,8 @@ static void check_keep_hash_and_external_table(
static int64 get_block_locations_and_claculte_table_size(
split_to_segment_mapping_context *collector_context);
+static bool dataStoredInHdfs(Relation rel);
+
static List *get_virtual_segments(QueryResource *resource);
static List *run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
@@ -368,6 +389,12 @@ static void ParquetGetSegFileDataLocation(Relation relation,
Relation_Data *rel_data, int* hitblocks,
int* allblocks, GpPolicy *targetPolicy);
+static void ExternalGetHdfsFileDataLocation(Relation relation,
+ split_to_segment_mapping_context *context, int64 splitsize,
+ Relation_Data *rel_data, int* allblocks);
+
+Oid LookupCustomProtocolBlockLocationFunc(char *protoname);
+
static BlockLocation *fetch_hdfs_data_block_location(char *filepath, int64 len,
int *block_num, RelFileNode rnode, uint32_t segno, double* hit_ratio);
@@ -464,6 +491,9 @@ static Relation_File** change_file_order_based_on_continuity(
static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
int host_num, double* maxSizePerSegment);
+static void InvokeHDFSProtocolBlockLocation(Oid procOid,
+ List *locs,
+ List **blockLocations);
/*
* Setup /cleanup the memory context for this run
* of data locality algorithm.
@@ -500,13 +530,17 @@ static void init_split_assignment_result(Split_Assignment_Result *result,
return;
}
-static void init_datalocality_context(split_to_segment_mapping_context *context) {
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+ split_to_segment_mapping_context *context) {
context->old_memorycontext = CurrentMemoryContext;
context->datalocality_memorycontext = DataLocalityMemoryContext;
context->chsl_context.relations = NIL;
context->rtc_context.range_tables = NIL;
- context->rtc_context.full_range_tables = NIL;
+ context->rtc_context.full_range_tables = plannedstmt->rtable;
+ context->srtc_context.range_tables = NIL;
+ context->srtc_context.full_range_tables = plannedstmt->rtable;
+ context->srtc_context.base.node = (Node *)plannedstmt;
context->externTableForceSegNum = 0;
context->externTableLocationSegNum = 0;
@@ -592,7 +626,7 @@ static bool range_table_collector_walker(Node *node,
* collect_range_tables: collect all range table relations, and store
* them into the range_table_collector_context.
*/
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
range_table_collector_context *context) {
query_tree_walker(query, range_table_collector_walker, (void *) context,
@@ -613,9 +647,27 @@ static void collect_range_tables(Query *query, List* full_range_table,
}
context->range_tables = new_range_tables;
}
- context->full_range_tables = full_range_table;
return;
}
+
+bool collect_scan_rangetable(Node *node,
+ collect_scan_rangetable_context *cxt) {
+ if (NULL == node) return false;
+
+ switch (nodeTag(node)) {
+ case T_ExternalScan:
+ case T_AppendOnlyScan:
+ case T_ParquetScan: {
+ RangeTblEntry *rte = rt_fetch(((Scan *)node)->scanrelid,
+ cxt->full_range_tables);
+ cxt->range_tables = lappend(cxt->range_tables, rte);
+ }
+ default:
+ break;
+ }
+
+ return plan_tree_walker(node, collect_scan_rangetable, cxt);
+}
/*
*
*/
@@ -750,6 +802,19 @@ static void check_keep_hash_and_external_table(
if (uri && uri->protocol == URI_CUSTOM && is_pxf_protocol(uri)) {
isPxf = true;
}
+ else if (uri && (is_hdfs_protocol(uri))) {
+ relation_close(rel, AccessShareLock);
+ if (targetPolicy->nattrs > 0)
+ {
+ /*select the maximum hash bucket number as hashSegNum of query*/
+ if (context->hashSegNum < targetPolicy->bucketnum)
+ {
+ context->hashSegNum = targetPolicy->bucketnum;
+ context->keep_hash = true;
+ }
+ }
+ continue;
+ }
}
}
if (extEnrty->command || isPxf) {
@@ -844,37 +909,14 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
/*
* We only consider the data stored in HDFS.
*/
- if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
- Relation_Data *rel_data = NULL;
- /*
- * Get pg_appendonly information for this table.
- */
- AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
-
- rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
+ bool isDataStoredInHdfs = dataStoredInHdfs(rel);
+ if (isDataStoredInHdfs ) {
+ GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
+ Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
rel_data->relid = rel_oid;
rel_data->files = NIL;
rel_data->partition_parent_relid = 0;
rel_data->block_count = 0;
-
- GpPolicy *targetPolicy = NULL;
- targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
- /*
- * Based on the pg_appendonly information, calculate the data
- * location information associated with this relation.
- */
- if (RelationIsAoRows(rel)) {
- rel_data->type = DATALOCALITY_APPENDONLY;
- AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
- aoEntry->splitsize, rel_data, &hitblocks,
- &allblocks, targetPolicy);
- } else {
- rel_data->type = DATALOCALITY_PARQUET;
- ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
- context->split_size, rel_data, &hitblocks,
- &allblocks, targetPolicy);
- }
-
bool isResultRelation = true;
ListCell *nonResultlc;
foreach(nonResultlc, context->rtc_context.range_tables)
@@ -884,6 +926,54 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
isResultRelation = false;
}
}
+
+ if (!isResultRelation) {
+ // skip the relation not in scan nodes
+ // for partition table scan optimization;
+ ListCell *rtc;
+ bool found = false;
+ foreach(rtc, context->srtc_context.range_tables) {
+ RangeTblEntry *rte = lfirst(rtc);
+ if (rel_oid == rte->relid) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ relation_close(rel, AccessShareLock);
+ continue;
+ }
+ }
+
+ if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+ /*
+ * Get pg_appendonly information for this table.
+ */
+ AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
+ /*
+ * Based on the pg_appendonly information, calculate the data
+ * location information associated with this relation.
+ */
+ if (RelationIsAoRows(rel)) {
+ rel_data->type = DATALOCALITY_APPENDONLY;
+ AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+ aoEntry->splitsize, rel_data, &hitblocks,
+ &allblocks, targetPolicy);
+ } else {
+ rel_data->type = DATALOCALITY_PARQUET;
+ ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+ context->split_size, rel_data, &hitblocks,
+ &allblocks, targetPolicy);
+ }
+ } else if (RelationIsExternal(rel)) {
+ if (isDataStoredInHdfs) {
+ // we can't use metadata cache, so hitblocks will always be 0
+ rel_data->type = DATALOCALITY_HDFS;
+ ExternalGetHdfsFileDataLocation(rel, context, context->split_size,
+ rel_data, &allblocks);
+ }
+ }
+
if (!isResultRelation) {
total_size += rel_data->total_size;
totalFileCount += list_length(rel_data->files);
@@ -915,8 +1005,7 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
}
context->total_file_count = totalFileCount;
context->total_size = total_size;
-
- context->metadata_cache_time_us = eclaspeTime;
+ context->metadata_cache_time_us = eclaspeTime;
if(debug_datalocality_time){
elog(LOG, "metadata overall execution time: %d us. \n", eclaspeTime);
@@ -924,6 +1013,25 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
return total_size;
}
+bool dataStoredInHdfs(Relation rel) {
+ if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+ return true;
+ } else if (RelationIsExternal(rel)) {
+ ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
+ bool isHdfsProtocol = false;
+ if (!extEnrty->command && extEnrty->locations) {
+ char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
+ if (first_uri_str) {
+ Uri* uri = ParseExternalTableUri(first_uri_str);
+ if (uri && is_hdfs_protocol(uri)) {
+ isHdfsProtocol = true;
+ }
+ }
+ }
+ return isHdfsProtocol;
+ }
+ return false;
+}
/*
* search_host_in_stat_context: search a host name in the statistic
* context; if not found, create a new one.
@@ -1579,7 +1687,237 @@ static void ParquetGetSegFileDataLocation(Relation relation,
return;
}
+static void InvokeHDFSProtocolBlockLocation(Oid procOid,
+ List *locs,
+ List **blockLocations)
+{
+ ExtProtocolValidatorData *validator_data;
+ FmgrInfo *validator_udf;
+ FunctionCallInfoData fcinfo;
+
+ validator_data = (ExtProtocolValidatorData *)
+ palloc0 (sizeof(ExtProtocolValidatorData));
+ validator_udf = palloc(sizeof(FmgrInfo));
+ fmgr_info(procOid, validator_udf);
+
+ validator_data->type = T_ExtProtocolValidatorData;
+ validator_data->url_list = locs;
+ validator_data->format_opts = NULL;
+ validator_data->errmsg = NULL;
+ validator_data->direction = EXT_VALIDATE_READ;
+ validator_data->action = EXT_VALID_ACT_GETBLKLOC;
+
+ InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
+ /* FmgrInfo */ validator_udf,
+ /* nArgs */ 0,
+ /* Call Context */ (Node *) validator_data,
+ /* ResultSetInfo */ NULL);
+
+ /* invoke validator. if this function returns - validation passed */
+ FunctionCallInvoke(&fcinfo);
+
+ ExtProtocolBlockLocationData *bls =
+ (ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
+ /* debug output block location. */
+ if (bls != NULL)
+ {
+ ListCell *c;
+ int i = 0 ,j = 0;
+ foreach(c, bls->files)
+ {
+ blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
+ elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
+ blf->file_uri, blf->block_num);
+ for ( i = 0 ; i < blf->block_num ; ++i )
+ {
+ BlockLocation *pbl = &(blf->locations[i]);
+ elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
+ INT64_FORMAT ", " INT64_FORMAT ", %d",
+ i,
+ pbl->corrupt, pbl->length, pbl->offset,
+ pbl->numOfNodes);
+ for ( j = 0 ; j < pbl->numOfNodes ; ++j )
+ {
+ elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
+ i,
+ pbl->hosts[j], pbl->names[j],
+ pbl->topologyPaths[j]);
+ }
+ }
+ }
+ }
+ elog(DEBUG3, "after invoking get block location API");
+
+ /* get location data from fcinfo.resultinfo. */
+ if (bls != NULL)
+ {
+ Assert(bls->type == T_ExtProtocolBlockLocationData);
+ while(list_length(bls->files) > 0)
+ {
+ void *v = lfirst(list_head(bls->files));
+ bls->files = list_delete_first(bls->files);
+ *blockLocations = lappend(*blockLocations, v);
+ }
+ }
+ pfree(validator_data);
+ pfree(validator_udf);
+}
+
+Oid
+LookupCustomProtocolBlockLocationFunc(char *protoname)
+{
+ List* funcname = NIL;
+ Oid procOid = InvalidOid;
+ Oid argList[1];
+ Oid returnOid;
+
+ char* new_func_name = (char *)palloc0(strlen(protoname) + 16);
+ sprintf(new_func_name, "%s_blocklocation", protoname);
+ funcname = lappend(funcname, makeString(new_func_name));
+ returnOid = VOIDOID;
+ procOid = LookupFuncName(funcname, 0, argList, true);
+
+ if (!OidIsValid(procOid))
+ ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+ errmsg("protocol function %s was not found.",
+ new_func_name),
+ errhint("Create it with CREATE FUNCTION."),
+ errOmitLocation(true)));
+
+ /* check return type matches */
+ if (get_func_rettype(procOid) != returnOid)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+ errmsg("protocol function %s has an incorrect return type",
+ new_func_name),
+ errOmitLocation(true)));
+
+ /* check allowed volatility */
+ if (func_volatile(procOid) != PROVOLATILE_STABLE)
+ ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+ errmsg("protocol function %s is not declared STABLE.",
+ new_func_name),
+ errOmitLocation(true)));
+ pfree(new_func_name);
+
+ return procOid;
+}
+
+static void ExternalGetHdfsFileDataLocation(
+ Relation relation,
+ split_to_segment_mapping_context *context,
+ int64 splitsize,
+ Relation_Data *rel_data,
+ int* allblocks) {
+ ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
+ Assert(ext_entry->locations != NIL);
+ int64 total_size = 0;
+ int segno = 1;
+
+ /*
+ * Step 1. get external HDFS location from URI.
+ */
+ char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
+ /* We must have at least one location. */
+ Assert(first_uri_str != NULL);
+ Uri* uri = ParseExternalTableUri(first_uri_str);
+ bool isHdfs = false;
+ if (uri != NULL && is_hdfs_protocol(uri)) {
+ isHdfs = true;
+ }
+ Assert(isHdfs); /* Currently, we accept HDFS only. */
+
+ /*
+ * Step 2. Get function to call for getting location information. This work
+ * is done by validator function registered for this external protocol.
+ */
+ Oid procOid = InvalidOid;
+ if (isHdfs) {
+ procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
+ }
+ else
+ {
+ Assert(false);
+ }
+
+ /*
+ * Step 3. Call validator to get location data.
+ */
+
+ /* Prepare function call parameter by passing into location string. This is
+ * only called at dispatcher side. */
+ List *bls = NULL; /* Block locations */
+ if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+ {
+ InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
+ }
+
+ /*
+ * Step 4. Build data location info for optimization after this call.
+ */
+
+ /* Go through each files */
+ ListCell *cbl = NULL;
+ foreach(cbl, bls)
+ {
+ blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
+ BlockLocation *locations = f->locations;
+ int block_num = f->block_num;
+ int64 logic_len = 0;
+ *allblocks += block_num;
+ if ((locations != NULL) && (block_num > 0)) {
+ // calculate length for one specific file
+ for (int j = 0; j < block_num; ++j) {
+ logic_len += locations[j].length;
+ // locations[j].lowerBoundInc = NULL;
+ // locations[j].upperBoundExc = NULL;
+ }
+ total_size += logic_len;
+
+ Block_Host_Index * host_index = update_data_dist_stat(context,
+ locations, block_num);
+
+ Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
+ if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
+ file->segno = atoi(strrchr(f->file_uri, '/') + 1);
+ else
+ file->segno = segno++;
+ file->block_num = block_num;
+ file->locations = locations;
+ file->hostIDs = host_index;
+ file->logic_len = logic_len;
+
+ // do the split logic
+ int realSplitNum = 0;
+ int split_num = file->block_num;
+ int64 offset = 0;
+ File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
+ while (realSplitNum < split_num) {
+ splits[realSplitNum].host = -1;
+ splits[realSplitNum].is_local_read = true;
+ splits[realSplitNum].offset = offset;
+ splits[realSplitNum].length = file->locations[realSplitNum].length;
+ splits[realSplitNum].logiceof = logic_len;
+ splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
+
+ if (logic_len - offset <= splits[realSplitNum].length) {
+ splits[realSplitNum].length = logic_len - offset;
+ ++realSplitNum;
+ break;
+ }
+ offset += splits[realSplitNum].length;
+ ++realSplitNum;
+ }
+ file->split_num = realSplitNum;
+ file->splits = splits;
+ context->total_split_count += realSplitNum;
+
+ rel_data->files = lappend(rel_data->files, file);
+ }
+ }
+ context->total_metadata_logic_len += total_size;
+ rel_data->total_size = total_size;
+}
/*
* step 1 search segments with local read and segment is not full after being assigned the block
* step 2 search segments with local read and segment is not full before being assigned the block
@@ -4021,14 +4359,17 @@ void find_udf(Query *query, udf_collector_context *context) {
* fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
*/
SplitAllocResult *
-calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
- List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum) {
+calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+ QueryResourceLife resourceLife, int fixedVsegNum) {
SplitAllocResult *result = NULL;
QueryResource *resource = NULL;
-
List *virtual_segments = NIL;
List *alloc_result = NIL;
+ Node *planTree = plannedstmt->planTree;
+ GpPolicy *intoPolicy = plannedstmt->intoPolicy;
+ int sliceNum = plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1;
split_to_segment_mapping_context context;
+ context.chsl_context.relations = NIL;
int planner_segments = 0; /*virtual segments number for explain statement */
@@ -4061,9 +4402,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
{
init_datalocality_memory_context();
- init_datalocality_context(&context);
+ init_datalocality_context(plannedstmt, &context);
+
+ collect_range_tables(query, &(context.rtc_context));
- collect_range_tables(query, fullRangeTable, &(context.rtc_context));
+ collect_scan_rangetable(planTree, &(context.srtc_context));
bool isTableFunctionExists = false;
@@ -4104,6 +4447,9 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
/* get block location and calculate relation size*/
get_block_locations_and_claculte_table_size(&context);
+ if(context.chsl_context.relations){
+ Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail);
+ }
/*use inherit resource*/
if (resourceLife == QRL_INHERIT) {
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/cdb/cdbpartition.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpartition.c b/src/backend/cdb/cdbpartition.c
index a887ee4..963983f 100644
--- a/src/backend/cdb/cdbpartition.c
+++ b/src/backend/cdb/cdbpartition.c
@@ -4787,9 +4787,9 @@ get_part_rule(Relation rel,
static void
fixup_table_storage_options(CreateStmt *ct)
{
- if (!ct->options)
+ if (!ct->base.options)
{
- ct->options = list_make2(makeDefElem("appendonly",
+ ct->base.options = list_make2(makeDefElem("appendonly",
(Node *)makeString("true")),
makeDefElem("orientation",
(Node *)makeString("column")));
@@ -5243,11 +5243,11 @@ atpxPartAddList(Relation rel,
ct = (CreateStmt *)linitial((List *)pUtl);
Assert(IsA(ct, CreateStmt));
- ct->is_add_part = true; /* subroutines need to know this */
+ ct->base.is_add_part = true; /* subroutines need to know this */
ct->ownerid = ownerid;
- if (!ct->distributedBy)
- ct->distributedBy = make_dist_clause(rel);
+ if (!ct->base.distributedBy)
+ ct->base.distributedBy = make_dist_clause(rel);
if (bSetTemplate)
/* if creating a template, silence partition name messages */
@@ -6377,7 +6377,7 @@ atpxPartAddList(Relation rel,
* name of the parent relation
*/
- ct->relation = makeRangeVar(
+ ct->base.relation = makeRangeVar(
NULL /*catalogname*/,
get_namespace_name(RelationGetNamespace(par_rel)),
RelationGetRelationName(par_rel), -1);
@@ -6429,7 +6429,7 @@ atpxPartAddList(Relation rel,
Oid skipTableRelid = InvalidOid;
List *attr_encodings = NIL;
- ct->partitionBy = (Node *)pBy;
+ ct->base.partitionBy = (Node *)pBy;
/* this parse_analyze expands the phony create of a partitioned table
* that we just build into the constituent commands we need to create
@@ -6517,7 +6517,7 @@ atpxPartAddList(Relation rel,
gs->objtype = ACL_OBJECT_RELATION;
gs->cooked_privs = cp;
- gs->objects = list_make1(copyObject(t->relation));
+ gs->objects = list_make1(copyObject(t->base.relation));
pt = parse_analyze((Node *)gs, NULL, NULL, 0);
l1 = list_concat(l1, pt);
@@ -6545,7 +6545,7 @@ atpxPartAddList(Relation rel,
{
CreateStmt *t = (CreateStmt *)((Query *)s)->utilityStmt;
- skipTableRelid = RangeVarGetRelid(t->relation, true, false /*allowHcatalog*/);
+ skipTableRelid = RangeVarGetRelid(t->base.relation, true, false /*allowHcatalog*/);
}
}
@@ -8376,7 +8376,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
/* Caller should check this! */
Assert(stmt->partitionBy && !stmt->is_part_child);
- foreach( lc_elt, stmt->tableElts )
+ foreach( lc_elt, stmt->base.tableElts )
{
Node * elt = lfirst(lc_elt);
@@ -8512,7 +8512,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
FkConstraint *fcon = list_nth(unnamed_cons, i);
fcon->constr_name =
- ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+ ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
colname,
label,
used_names);
@@ -8528,7 +8528,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
if ( 0 != strcmp(label, "pkey") )
colname = list_nth(unnamed_cons_col, i);
- con->name = ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+ con->name = ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
colname,
label,
used_names);