You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2018/08/14 02:15:39 UTC

[4/4] incubator-hawq git commit: Add hdfs protocol for pluggable storage framework

Add hdfs protocol for pluggable storage framework


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/1c189fc1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/1c189fc1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/1c189fc1

Branch: refs/heads/master
Commit: 1c189fc12bf357bed36a4ef85d973a49eb26cd28
Parents: 9f33d8d
Author: oushu1wangziming1 <wa...@oushu.io>
Authored: Tue Jul 17 15:43:36 2018 +0800
Committer: Ruilong Huo <hu...@163.com>
Committed: Tue Aug 14 10:14:46 2018 +0800

----------------------------------------------------------------------
 contrib/Makefile                                |    1 +
 contrib/exthdfs/Makefile                        |   34 +
 contrib/exthdfs/common.h                        |   38 +
 contrib/exthdfs/exthdfs.c                       |  469 ++++++++
 contrib/extprotocol/gpextprotocol.c             |    2 +-
 src/backend/access/external/fileam.c            |  740 ++++++++----
 src/backend/access/external/plugstorage.c       |   30 +-
 src/backend/catalog/cdb_external_extensions.sql |   12 +
 src/backend/catalog/heap.c                      |   79 +-
 src/backend/cdb/cdbdatalocality.c               |  434 ++++++-
 src/backend/cdb/cdbpartition.c                  |   24 +-
 src/backend/commands/analyze.c                  |  417 ++++++-
 src/backend/commands/copy.c                     |   22 +-
 src/backend/commands/indexcmds.c                |    1 +
 src/backend/commands/sequence.c                 |   22 +-
 src/backend/commands/tablecmds.c                |  870 +++++++++++---
 src/backend/commands/typecmds.c                 |   19 +-
 src/backend/commands/user.c                     |   35 +-
 src/backend/commands/view.c                     |   19 +-
 src/backend/executor/execDML.c                  |   20 +-
 src/backend/nodes/copyfuncs.c                   |   40 +-
 src/backend/nodes/equalfuncs.c                  |   39 +-
 src/backend/nodes/outfast.c                     |   40 +-
 src/backend/nodes/outfuncs.c                    |   43 +-
 src/backend/nodes/readfast.c                    |   60 +-
 src/backend/nodes/readfuncs.c                   |   44 +-
 src/backend/optimizer/plan/createplan.c         |   44 +-
 src/backend/optimizer/plan/planner.c            |   10 +-
 src/backend/parser/analyze.c                    | 1098 +++++++++++++-----
 src/backend/parser/gram.y                       |  178 ++-
 src/backend/tcop/utility.c                      |   48 +-
 src/backend/utils/misc/uriparser.c              |   10 +-
 src/include/access/fileam.h                     |   13 +-
 src/include/access/formatter.h                  |   10 +
 src/include/access/plugstorage.h                |    6 +-
 src/include/catalog/pg_exttable.h               |    7 +-
 src/include/cdb/cdbdatalocality.h               |   11 +-
 src/include/commands/tablecmds.h                |    4 +-
 src/include/nodes/parsenodes.h                  |  118 +-
 src/include/parser/analyze.h                    |   50 +-
 src/include/utils/uri.h                         |    5 +-
 41 files changed, 4016 insertions(+), 1150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/Makefile
----------------------------------------------------------------------
diff --git a/contrib/Makefile b/contrib/Makefile
index 695e92a..e5daff9 100644
--- a/contrib/Makefile
+++ b/contrib/Makefile
@@ -9,6 +9,7 @@ WANTED_DIRS = \
 		extprotocol \
 		gp_cancel_query \
 		formatter_fixedwidth \
+		exthdfs\
 		hawq-hadoop
 
 ifeq ($(with_pgcrypto), yes)

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/Makefile
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/Makefile b/contrib/exthdfs/Makefile
new file mode 100644
index 0000000..e247664
--- /dev/null
+++ b/contrib/exthdfs/Makefile
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+MODULE_big = exthdfs
+OBJS       = exthdfs.o
+
+PG_CPPFLAGS = -I$(libpq_srcdir)
+PG_LIBS = $(libpq_pgport)
+
+override CFLAGS += -lhdfs3
+
+ifdef USE_PGXS
+PGXS := $(shell pg_config --pgxs)
+include $(PGXS)
+else
+subdir = contrib/exthdfs
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/common.h
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/common.h b/contrib/exthdfs/common.h
new file mode 100644
index 0000000..4111909
--- /dev/null
+++ b/contrib/exthdfs/common.h
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _EXTHDFS_COMMON_H_
+#define _EXTHDFS_COMMON_H_
+
+#include "postgres.h"
+#include "fmgr.h"
+#include "funcapi.h"
+#include "access/extprotocol.h"
+#include "access/fileam.h"
+#include "catalog/pg_proc.h"
+#include "catalog/pg_exttable.h"
+#include "utils/array.h"
+#include "utils/builtins.h"
+#include "utils/memutils.h"
+#include "miscadmin.h"
+
+#include <fcntl.h>
+
+#endif  // _EXTHDFS_COMMON_H_
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/exthdfs/exthdfs.c
----------------------------------------------------------------------
diff --git a/contrib/exthdfs/exthdfs.c b/contrib/exthdfs/exthdfs.c
new file mode 100644
index 0000000..1378734
--- /dev/null
+++ b/contrib/exthdfs/exthdfs.c
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "postgres.h"
+
+#include "common.h"
+#include "access/extprotocol.h"
+#include "cdb/cdbdatalocality.h"
+#include "storage/fd.h"
+#include "storage/filesystem.h"
+#include "utils/uri.h"
+
+
+
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1(hdfsprotocol_blocklocation);
+PG_FUNCTION_INFO_V1(hdfsprotocol_validate);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS);
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS);
+
+Datum hdfsprotocol_blocklocation(PG_FUNCTION_ARGS)
+{
+
+	// Build the result instance
+	int nsize = 0;
+	int numOfBlock = 0;
+	ExtProtocolBlockLocationData *bldata =
+		palloc0(sizeof(ExtProtocolBlockLocationData));
+	if (bldata == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+                    "cannot allocate due to no memory");
+	}
+	bldata->type = T_ExtProtocolBlockLocationData;
+	fcinfo->resultinfo = bldata;
+
+	ExtProtocolValidatorData *pvalidator_data = (ExtProtocolValidatorData *)
+												(fcinfo->context);
+
+
+	 // Parse URI of the first location, we expect all locations uses the same
+	 // name node server. This is checked in validation function.
+
+	char *first_uri_str = (char *)strVal(lfirst(list_head(pvalidator_data->url_list)));
+	Uri *uri = ParseExternalTableUri(first_uri_str);
+
+	elog(DEBUG3, "hdfsprotocol_blocklocation : "
+				 "extracted HDFS name node address %s:%d",
+				 uri->hostname, uri->port);
+
+	// Create file system instance
+	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+	if (fs == NULL)
+	{
+		elog(ERROR, "hdfsprotocol_blocklocation : "
+					"failed to create HDFS instance to connect to %s:%d",
+					uri->hostname, uri->port);
+	}
+
+	// Clean up uri instance as we don't need it any longer
+	FreeExternalTableUri(uri);
+
+	// Check all locations to get files to fetch location.
+	ListCell *lc = NULL;
+	foreach(lc, pvalidator_data->url_list)
+	{
+		// Parse current location URI.
+		char *url = (char *)strVal(lfirst(lc));
+		Uri *uri = ParseExternalTableUri(url);
+		if (uri == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_blocklocation : "
+						"invalid URI encountered %s", url);
+		}
+
+		 //
+		 // NOTICE: We temporarily support only directories as locations. We plan
+		 //        to extend the logic to specifying single file as one location
+		 //         very soon.
+
+
+		// get files contained in the path.
+		hdfsFileInfo *fiarray = hdfsListDirectory(fs, uri->path,&nsize);
+		if (fiarray == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_blocklocation : "
+						"failed to get files of path %s",
+						uri->path);
+		}
+
+		int i = 0 ;
+		// Call block location api to get data location for each file
+		for (i = 0 ; i < nsize ; i++)
+		{
+			hdfsFileInfo *fi = &fiarray[i];
+
+			// break condition of this for loop
+			if (fi == NULL) {break;}
+
+			// Build file name full path.
+			const char *fname = fi->mName;
+			char *fullpath = palloc0(                // slash
+									 strlen(fname) +      // name
+									 1);                  // \0
+			sprintf(fullpath, "%s", fname);
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : "
+						 "built full path file %s", fullpath);
+
+			// Get file full length.
+			int64_t len = fi->mSize;
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : "
+					     "got file %s length " INT64_FORMAT,
+					     fullpath, len);
+
+			if (len == 0) {
+				pfree(fullpath);
+				continue;
+			}
+
+			// Get block location data for this file
+			BlockLocation *bla = hdfsGetFileBlockLocations(fs, fullpath, 0, len,&numOfBlock);
+			if (bla == NULL)
+			{
+				elog(ERROR, "hdfsprotocol_blocklocation : "
+							"failed to get block location of path %s. "
+							"It is reported generally due to HDFS service errors or "
+							"another session's ongoing writing.",
+							fullpath);
+			}
+
+			// Add file full path and its block number as result.
+			blocklocation_file *blf = palloc0(sizeof(blocklocation_file));
+			blf->file_uri = pstrdup(fullpath);
+			blf->block_num = numOfBlock;
+			blf->locations = palloc0(sizeof(BlockLocation) * blf->block_num);
+
+			elog(DEBUG3, "hdfsprotocol_blocklocation : file %s has %d blocks",
+					  	 fullpath, blf->block_num);
+
+			// We don't need it any longer
+			pfree(fullpath);
+			int bidx = 0;
+			// Add block information as a list.
+			for (bidx = 0 ; bidx < blf->block_num ; bidx++)
+			{
+				BlockLocation *blo = &bla[bidx];
+				BlockLocation *bl = &(blf->locations[bidx]);
+				bl->numOfNodes = blo->numOfNodes;
+				bl->hosts = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->names = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->topologyPaths = (char **)palloc0(sizeof(char *) * bl->numOfNodes);
+				bl->offset = blo->offset;
+				bl->length = blo->length;
+				bl->corrupt = blo->corrupt;
+
+				int nidx = 0 ;
+				for (nidx = 0 ; nidx < bl->numOfNodes ; nidx++)
+				{
+					bl->hosts[nidx] = pstrdup(*blo[nidx].hosts);
+					bl->names[nidx] = pstrdup(*blo[nidx].names);
+					bl->topologyPaths[nidx] =pstrdup(*blo[nidx].topologyPaths);
+				}
+			}
+
+			bldata->files = lappend(bldata->files, (void *)(blf));
+
+			// Clean up block location instances created by the lib.
+			hdfsFreeFileBlockLocations(bla,numOfBlock);
+		}
+
+		// Clean up URI instance in loop as we don't need it any longer
+		FreeExternalTableUri(uri);
+
+		// Clean up file info array created by the lib for this location.
+		hdfsFreeFileInfo(fiarray,nsize);
+	}
+
+	// destroy fs instance
+	hdfsDisconnect(fs);
+
+	PG_RETURN_VOID();
+
+}
+
+Datum hdfsprotocol_validate(PG_FUNCTION_ARGS)
+{
+	elog(DEBUG3, "hdfsprotocol_validate() begin");
+
+	/* Check which action should perform. */
+	ExtProtocolValidatorData *pvalidator_data =
+       (ExtProtocolValidatorData *)(fcinfo->context);
+
+	if (pvalidator_data->forceCreateDir)
+		Assert(pvalidator_data->url_list && pvalidator_data->url_list->length == 1);
+
+	if (pvalidator_data->direction == EXT_VALIDATE_WRITE)
+	{
+		/* accept only one directory location */
+		if (list_length(pvalidator_data->url_list) != 1)
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_SYNTAX_ERROR),
+					 errmsg("hdfsprotocol_validate : "
+							"only one location url is supported for writable external hdfs")));
+		}
+	}
+
+	/* Go through first round to get formatter type */
+	bool isCsv = false;
+	bool isText = false;
+	bool isOrc = false;
+	ListCell *optcell = NULL;
+	foreach(optcell, pvalidator_data->format_opts)
+	{
+		DefElem *de = (DefElem *)lfirst(optcell);
+		if (strcasecmp(de->defname, "formatter") == 0)
+		{
+			char *val = strVal(de->arg);
+			if (strcasecmp(val, "csv") == 0)
+			{
+				isCsv = true;
+			}
+			else if (strcasecmp(val, "text") == 0)
+			{
+				isText = true;
+			}
+			else if (strcasecmp(val, "orc") == 0)
+			{
+				isOrc = true;
+			}
+		}
+	}
+	/*if(1)
+	{
+		ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"no formatter is supported for external hdfs")));
+	}*/
+	if (!isCsv && !isText && !isOrc)
+	{
+		ereport(ERROR,
+				(errcode(ERRCODE_SYNTAX_ERROR),
+				 errmsg("hdfsprotocol_validate : "
+						"only 'csv', 'text' and 'orc' formatter is supported for external hdfs")));
+	}
+	Assert(isCsv || isText || isOrc);
+
+	/* Validate formatter options */
+	foreach(optcell, pvalidator_data->format_opts)
+	{
+		DefElem *de = (DefElem *)lfirst(optcell);
+		if (strcasecmp(de->defname, "delimiter") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 1. User can not specify 'OFF' in delimiter */
+			if (strcasecmp(val, "off") == 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"'off' value of 'delimiter' option is not supported")));
+			}
+			/* Validation 2. Can specify multibytes characters */
+			if (strlen(val) < 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'delimiter' option accepts multibytes characters")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "escape") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 3. User can not specify 'OFF' in delimiter */
+			if (strcasecmp(val, "off") == 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"'off' value of 'escape' option is not supported")));
+			}
+			/* Validation 4. Can only specify one character */
+			if (strlen(val) != 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'escape' option accepts single character")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "newline") == 0)
+		{
+			char *val = strVal(de->arg);
+			/* Validation 5. only accept 'lf', 'cr', 'crlf' */
+			if (strcasecmp(val, "lf") != 0 &&
+				strcasecmp(val, "cr") != 0 &&
+				strcasecmp(val, "crlf") != 0)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+						 errmsg("hdfsprotocol_validate : "
+								"the value of 'newline' option can only be "
+								"'lf', 'cr' or 'crlf'")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "quote") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'quote' option is only available in 'csv' formatter")));
+			}
+
+			char *val = strVal(de->arg);
+			/* Validation 5. Can only specify one character */
+			if (strlen(val) != 1)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'quote' option accepts single character")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "force_notnull") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'force_notnull' option is only available in 'csv' formatter")));
+			}
+		}
+
+		if (strcasecmp(de->defname, "force_quote") == 0)
+		{
+			/* This is allowed only for csv mode formatter */
+			if (!isCsv)
+			{
+				ereport(ERROR,
+						(errcode(ERRCODE_SYNTAX_ERROR),
+								 errmsg("hdfsprotocol_validate : "
+										"'force_quote' option is only available in 'csv' formatter")));
+			}
+		}
+	}
+
+	/* All urls should
+	 * 1) have the same protocol name 'hdfs',
+	 * 2) the same hdfs namenode server address
+	 */
+	/* Check all locations to get files to fetch location. */
+	char *nnaddr = NULL;
+	int nnport = -1;
+	ListCell *lc = NULL;
+	foreach(lc, pvalidator_data->url_list)
+	{
+		/* Parse current location URI. */
+		char *url = (char *)strVal(lfirst(lc));
+		Uri *uri = ParseExternalTableUri(url);
+		if (uri == NULL)
+		{
+			elog(ERROR, "hdfsprotocol_validate : "
+						"invalid URI encountered %s", url);
+		}
+
+		if (uri->protocol != URI_HDFS)
+		{
+			elog(ERROR, "hdfsprotocol_validate : "
+						"invalid URI protocol encountered in %s, "
+						"hdfs:// protocol is required",
+						url);
+		}
+
+		if (nnaddr == NULL)
+		{
+			nnaddr = pstrdup(uri->hostname);
+			nnport = uri->port;
+		}
+		else
+		{
+			if (strcmp(nnaddr, uri->hostname) != 0)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"different name server addresses are detected, "
+							"both %s and %s are found",
+							nnaddr, uri->hostname);
+			}
+			if (nnport != uri->port)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"different name server ports are detected, "
+							"both %d and %d are found",
+							nnport, uri->port);
+			}
+		}
+
+		/* SHOULD ADD LOGIC HERE TO CREATE UNEXISTING PATH */
+		if (pvalidator_data->forceCreateDir) {
+
+		  elog(LOG, "hdfs_validator() forced creating dir");
+
+		  /* Create file system instance */
+		  	hdfsFS fs = hdfsConnect(uri->hostname, uri->port);
+			if (fs == NULL)
+			{
+				elog(ERROR, "hdfsprotocol_validate : "
+							"failed to create HDFS instance to connect to %s:%d",
+							uri->hostname, uri->port);
+			}
+
+			if (hdfsExists(fs, uri->path) == -1)
+				elog(ERROR, "hdfsprotocol_validate : "
+						"Location \"%s\" is not exist",
+						uri->path);
+
+		 /* destroy fs instance */
+			hdfsDisconnect(fs);
+		}
+
+		/* Clean up temporarily created instances */
+		FreeExternalTableUri(uri);
+		if (nnaddr != NULL)
+		{
+			pfree(nnaddr);
+		}
+	}
+
+	elog(LOG, "passed validating hdfs protocol options");
+
+	/**************************************************************************
+	 * This is a bad implementation that we check formatter options here. Should
+	 * be moved to call formatter specific validation UDFs.
+	 **************************************************************************/
+
+	PG_RETURN_VOID();
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/contrib/extprotocol/gpextprotocol.c
----------------------------------------------------------------------
diff --git a/contrib/extprotocol/gpextprotocol.c b/contrib/extprotocol/gpextprotocol.c
index 419b923..07f1731 100644
--- a/contrib/extprotocol/gpextprotocol.c
+++ b/contrib/extprotocol/gpextprotocol.c
@@ -324,5 +324,5 @@ void FreeDemoUri(DemoUri *uri)
 	if (uri->protocol)
 		pfree(uri->protocol);
 	
-	pfree(uri);
+	FreeExternalTableUri(uri);
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 692a5db..0d2527d 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -86,14 +86,17 @@ static void InitParseState(CopyState pstate, Relation relation,
 						   char *uri, int rejectlimit,
 						   bool islimitinrows, Oid fmterrtbl, ResultRelSegFileInfo *segfileinfo, int encoding);
 
-static void FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
-										 int					nArgs,
-										 CopyState 				pstate,
-										 FormatterData*			formatter,
-										 Relation 				rel,
-										 TupleDesc 				tupDesc,
-										 FmgrInfo			   *convFuncs,
-										 Oid                   *typioparams);
+static void
+FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
+							 int					nArgs,
+							 CopyState 				pstate,
+							 FormatterData		   *formatter,
+							 Relation 				rel,
+							 TupleDesc 				tupDesc,
+							 FmgrInfo			   *convFuncs,
+							 Oid                   *typioparams,
+							 char				   *url,
+							 ScanState			   *ss);
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
@@ -305,12 +308,54 @@ external_beginscan(ExternalScan *extScan,
 	scan->errcontext.previous = error_context_stack;
 
 	//pgstat_initstats(relation);
-
+	external_populate_formatter_actionmask(scan->fs_pstate, scan->fs_formatter);
 	return scan;
 }
 
 
 /* ----------------
+ * external_populate_formatter_actionmask
+ * ----------------
+ */
+void external_populate_formatter_actionmask(CopyState pstate,
+											FormatterData *formatter)
+{
+	/* We just call the formatter in function to populate the mask */
+	formatter->fmt_mask = FMT_UNSET;
+
+	if (pstate->custom_formatter_func == NULL)
+	{
+		formatter->fmt_mask |= FMT_NEEDEXTBUFF;
+		elog(LOG, "external scan needs an external protocol to cooperate");
+		return;
+	}
+
+	Datum d;
+	FunctionCallInfoData fcinfo;
+	/* per call formatter prep */
+	FunctionCallPrepareFormatter(&fcinfo,
+								 0,
+								 pstate,
+								 formatter,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL,
+								 NULL);
+	d = FunctionCallInvoke(&fcinfo);
+
+	if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+	{
+		elog(LOG, "external scan needs an external protocol to cooperate");
+	}
+	else
+	{
+		elog(LOG, "external scan needs only formatter to manipulate data");
+	}
+}
+
+/* ----------------
 *		external_rescan  - (re)start a scan of an external file
 * ----------------
 */
@@ -525,13 +570,15 @@ external_getnext(FileScanDesc scan,
 	/*
 	 * open the external source (local file or http).
 	 *
-	 * NOTE: external_beginscan() seems like the natural place for this call. However,
-	 * in queries with more than one gang each gang will initialized all the nodes
-	 * of the plan (but actually executed only the nodes in it's local slice)
-	 * This means that external_beginscan() (and external_endscan() too) will get called
-	 * more than needed and we'll end up opening too many http connections when
-	 * they are not expected (see MPP-1261). Therefore we instead do it here on the
-	 * first time around only.
+	 * NOTE: external_beginscan() seems like the natural place for this call.
+	 * However, in queries with more than one gang each gang will initialized
+	 * all the nodes of the plan (but actually executed only the nodes in it's
+	 * local slice)
+	 *
+	 * This means that external_beginscan() (and external_endscan() too) will
+	 * get called more than needed and we'll end up opening too many http
+	 * connections when they are not expected (see MPP-1261). Therefore we
+	 * instead do it here on the first time around only.
 	 */
 
 	/*
@@ -539,7 +586,7 @@ external_getnext(FileScanDesc scan,
 	 * load external protocol.
 	 */
 
-	if (scan->fs_file == NULL)
+	if (scan->fs_file == NULL && (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF))
 		open_external_readable_source(scan);
 
 	/* Note: no locking manipulations needed */
@@ -567,6 +614,7 @@ external_getnext(FileScanDesc scan,
 	return true;
 }
 
+
 /*
  * external_insert_init
  *
@@ -575,7 +623,7 @@ external_getnext(FileScanDesc scan,
  */
 ExternalInsertDesc
 external_insert_init(Relation rel, int errAosegno,
-                     ExternalTableType formatterType, char *formatterName)
+                     int formatterType, char *formatterName, PlannedStmt* plannedstmt)
 {
 	ExternalInsertDesc	extInsertDesc;
 	ExtTableEntry*		extentry;
@@ -634,8 +682,12 @@ external_insert_init(Relation rel, int errAosegno,
 		/* get a url to use. we use seg number modulo total num of urls */
 		v = list_nth(extentry->locations, my_url);
 		uri_str = pstrdup(v->val.str);
+		Uri* uri = ParseExternalTableUri(uri_str);
+
 		extInsertDesc->ext_uri = uri_str;
 
+		FreeExternalTableUri(uri);
+
 		/*elog(NOTICE, "seg %d got url number %d: %s", segindex, my_url, uri_str);*/
 	}
 
@@ -668,6 +720,10 @@ external_insert_init(Relation rel, int errAosegno,
 	{
 		extInsertDesc->ext_formatter_data = (FormatterData *) palloc0 (sizeof(FormatterData));
 		extInsertDesc->ext_formatter_data->fmt_perrow_ctx = extInsertDesc->ext_pstate->rowcontext;
+
+		/* First call formatter in function to get action mask */
+		external_populate_formatter_actionmask(extInsertDesc->ext_pstate,
+											   extInsertDesc->ext_formatter_data);
 	}
 
 	return extInsertDesc;
@@ -687,7 +743,6 @@ external_insert_init(Relation rel, int errAosegno,
 Oid
 external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 {
-
 	HeapTuple		instup = ExecFetchSlotHeapTuple(tupTableSlot);
 	TupleDesc 		tupDesc = extInsertDesc->ext_tupDesc;
 	Datum*			values = extInsertDesc->ext_values;
@@ -695,14 +750,18 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 	CopyStateData*  pstate = extInsertDesc->ext_pstate;
 	bool			customFormat = extInsertDesc->ext_pstate->custom;
 
-
 	if(extInsertDesc->ext_noop)
 		return InvalidOid;
 
 
 	/* Open our output file or output stream if not yet open */
-	if(!extInsertDesc->ext_file && !extInsertDesc->ext_noop)
+	if(!extInsertDesc->ext_file &&
+	   !extInsertDesc->ext_noop &&
+	   (extInsertDesc->ext_formatter_data == NULL ||
+	   (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF)))
+	{
 		open_external_writable_source(extInsertDesc);
+	}
 
 	/*
 	 * deconstruct the tuple and format it into text
@@ -730,15 +789,34 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 		/* must have been created during insert_init */
 		Assert(formatter);
 
-		/* per call formatter prep */
-		FunctionCallPrepareFormatter(&fcinfo,
-									 1,
-									 pstate,
-									 formatter,
-									 extInsertDesc->ext_rel,
-									 extInsertDesc->ext_tupDesc,
-									 pstate->out_functions,
-									 NULL);
+		if ((formatter->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+		{
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 0,
+										 pstate,
+										 formatter,
+										 extInsertDesc->ext_rel,
+										 extInsertDesc->ext_tupDesc,
+										 pstate->out_functions,
+										 NULL,
+										 extInsertDesc->ext_uri,
+										 NULL);
+		}
+		else
+		{
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 1,
+										 pstate,
+										 formatter,
+										 extInsertDesc->ext_rel,
+										 extInsertDesc->ext_tupDesc,
+										 pstate->out_functions,
+										 NULL,
+										 NULL,
+										 NULL);
+		}
 
 		/* Mark the correct record type in the passed tuple */
 		HeapTupleHeaderSetTypeId(instup->t_data, tupDesc->tdtypeid);
@@ -748,18 +826,22 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 		fcinfo.argnull[0] = false;
 
 		d = FunctionCallInvoke(&fcinfo);
+
 		MemoryContextReset(formatter->fmt_perrow_ctx);
 
-		/* We do not expect a null result */
-		if (fcinfo.isnull)
-			elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
+		if (formatter->fmt_mask & FMT_NEEDEXTBUFF)
+		{
+			/* We do not expect a null result */
+			if (fcinfo.isnull)
+				elog(ERROR, "function %u returned NULL", fcinfo.flinfo->fn_oid);
 
-		b = DatumGetByteaP(d);
+			b = DatumGetByteaP(d);
 
-		CopyOneCustomRowTo(pstate, b);
+			CopyOneCustomRowTo(pstate, b);
+		}
 	}
 
-	if (extInsertDesc->ext_formatter_data == NULL)
+	if (extInsertDesc->ext_formatter_data == NULL || (extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF))
 	{
 		/* Write the data into the external source */
 		external_senddata((URL_FILE*)extInsertDesc->ext_file, pstate);
@@ -769,7 +851,6 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 		pstate->fe_msgbuf->data[0] = '\0';
 	}
 	pstate->processed++;
-
 	return HeapTupleGetOid(instup);
 }
 
@@ -782,6 +863,28 @@ external_insert(ExternalInsertDesc extInsertDesc, TupleTableSlot *tupTableSlot)
 void
 external_insert_finish(ExternalInsertDesc extInsertDesc)
 {
+	/* Tell formatter to close */
+	if (extInsertDesc->ext_formatter_data != NULL &&
+		(extInsertDesc->ext_formatter_data->fmt_mask & FMT_NEEDEXTBUFF) == 0)
+	{
+		Datum d;
+		FunctionCallInfoData fcinfo;
+
+		extInsertDesc->ext_formatter_data->fmt_mask |= FMT_WRITE_END;
+
+		/* per call formatter prep */
+		FunctionCallPrepareFormatter(&fcinfo,
+									 0,
+									 extInsertDesc->ext_pstate,
+									 extInsertDesc->ext_formatter_data,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL,
+									 NULL);
+		d = FunctionCallInvoke(&fcinfo);
+	}
 
 	/*
 	 * Close the external source
@@ -805,6 +908,7 @@ external_insert_finish(ExternalInsertDesc extInsertDesc)
 	pfree(extInsertDesc);
 }
 
+
 /* ==========================================================================
  * The follwing macros aid in major refactoring of data processing code (in
  * externalgettup() ). We use macros because in some cases the code must be in
@@ -1042,102 +1146,102 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
 static HeapTuple
 externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
-		HeapTuple	tuple = NULL;
-		CopyState	pstate = scan->fs_pstate;
-		bool        needData = false;
+	HeapTuple	tuple = NULL;
+	CopyState	pstate = scan->fs_pstate;
+	bool        needData = false;
 
-		/* If we either got things to read or stuff to process */
-		while (!pstate->fe_eof || !pstate->raw_buf_done)
+	/* If we either got things to read or stuff to process */
+	while (!pstate->fe_eof || !pstate->raw_buf_done)
+	{
+		/* need to fill our buffer with data? */
+		if (pstate->raw_buf_done)
 		{
-			/* need to fill our buffer with data? */
-			if (pstate->raw_buf_done)
-			{
-			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
-				pstate->begloc = pstate->raw_buf;
-				pstate->raw_buf_done = (pstate->bytesread==0);
-				pstate->raw_buf_index = 0;
+		    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+			pstate->begloc = pstate->raw_buf;
+			pstate->raw_buf_done = (pstate->bytesread==0);
+			pstate->raw_buf_index = 0;
 
-				/* on first time around just throw the header line away */
-				if (pstate->header_line && pstate->bytesread > 0)
+			/* on first time around just throw the header line away */
+			if (pstate->header_line && pstate->bytesread > 0)
+			{
+				PG_TRY();
+				{
+					readHeaderLine(pstate);
+				}
+				PG_CATCH();
 				{
-					PG_TRY();
+					/*
+					 * got here? encoding conversion error occurred on the
+					 * header line (first row).
+					 */
+					if (pstate->errMode == ALL_OR_NOTHING)
 					{
-						readHeaderLine(pstate);
+						PG_RE_THROW();
 					}
-					PG_CATCH();
+					else
 					{
+						/* SREH - release error state */
+						if (!elog_dismiss(DEBUG5))
+							PG_RE_THROW(); /* hope to never get here! */
+
 						/*
-						 * got here? encoding conversion error occurred on the
-						 * header line (first row).
+						 * note: we don't bother doing anything special here.
+						 * we are never interested in logging a header line
+						 * error. just continue the workflow.
 						 */
-						if (pstate->errMode == ALL_OR_NOTHING)
-						{
-							PG_RE_THROW();
-						}
-						else
-						{
-							/* SREH - release error state */
-							if (!elog_dismiss(DEBUG5))
-								PG_RE_THROW(); /* hope to never get here! */
-
-							/*
-							 * note: we don't bother doing anything special here.
-							 * we are never interested in logging a header line
-							 * error. just continue the workflow.
-							 */
-						}
 					}
-					PG_END_TRY();
-
-					EXT_RESET_LINEBUF;
-					pstate->header_line = false;
 				}
+				PG_END_TRY();
+
+				EXT_RESET_LINEBUF;
+				pstate->header_line = false;
 			}
+		}
 
-			/* while there is still data in our buffer */
-			while (!pstate->raw_buf_done || needData)
-			{
-				DataLineStatus ret_mode = parse_next_line(scan);
+		/* while there is still data in our buffer */
+		while (!pstate->raw_buf_done || needData)
+		{
+			DataLineStatus ret_mode = parse_next_line(scan);
 
-				if(ret_mode == LINE_OK)
-				{
-					/* convert to heap tuple */
-					/* XXX This is bad code.  Planner should be able to
-					 * decide whether we need heaptuple or memtuple upstream,
-					 * so make the right decision here.
-					 */
-					tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
-					pstate->processed++;
-					MemoryContextReset(pstate->rowcontext);
-					return tuple;
-				}
-				else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
-				{
-					/* error was handled in parse_next_line. move to the next */
-					continue;
-				}
-				else if(ret_mode == END_MARKER)
-				{
-					scan->fs_inited = false;
-					return NULL;
-				}
-				else
-				{
-					/* try to get more data if possible */
-					Assert((ret_mode == NEED_MORE_DATA) ||
-						   (ret_mode == LINE_ERROR && pstate->raw_buf_done));
-					needData = true;
-					break;
-				}
+			if(ret_mode == LINE_OK)
+			{
+				/* convert to heap tuple */
+				/* XXX This is bad code.  Planner should be able to
+				 * decide whether we need heaptuple or memtuple upstream,
+				 * so make the right decision here.
+				 */
+				tuple = heap_form_tuple(scan->fs_tupDesc, scan->values, scan->nulls);
+				pstate->processed++;
+				MemoryContextReset(pstate->rowcontext);
+				return tuple;
+			}
+			else if(ret_mode == LINE_ERROR && !pstate->raw_buf_done)
+			{
+				/* error was handled in parse_next_line. move to the next */
+				continue;
+			}
+			else if(ret_mode == END_MARKER)
+			{
+				scan->fs_inited = false;
+				return NULL;
+			}
+			else
+			{
+				/* try to get more data if possible */
+				Assert((ret_mode == NEED_MORE_DATA) ||
+					   (ret_mode == LINE_ERROR && pstate->raw_buf_done));
+				needData = true;
+				break;
 			}
 		}
+	}
 
-		/*
-		 * if we got here we finished reading all the data.
-		 */
-		scan->fs_inited = false;
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	scan->fs_inited = false;
 
-		return NULL;
+	return NULL;
 
 
 }
@@ -1145,149 +1249,241 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss
 static HeapTuple
 externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
-		HeapTuple   	tuple;
-		CopyState		pstate = scan->fs_pstate;
-		FormatterData*	formatter = scan->fs_formatter;
-		bool			no_more_data = false;
-		MemoryContext 	oldctxt = CurrentMemoryContext;
+	HeapTuple   	tuple;
+	CopyState		pstate = scan->fs_pstate;
+	FormatterData*	formatter = scan->fs_formatter;
+	bool			no_more_data = false;
+	MemoryContext 	oldctxt = CurrentMemoryContext;
 
-		Assert(formatter);
+	Assert(formatter);
 
-		/* while didn't finish processing the entire file */
-		while (!no_more_data)
+	/* while didn't finish processing the entire file */
+	while (!no_more_data)
+	{
+		/* need to fill our buffer with data? */
+		if (pstate->raw_buf_done)
 		{
-			/* need to fill our buffer with data? */
-			if (pstate->raw_buf_done)
-			{
-				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
-				if ( bytesread > 0 )
-					appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
-				pstate->raw_buf_done = false;
-
-				/* HEADER not yet supported ... */
-				if(pstate->header_line)
-					elog(ERROR, "header line in custom format is not yet supported");
-			}
+			int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
+			if ( bytesread > 0 )
+				appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
+			pstate->raw_buf_done = false;
+
+			/* HEADER not yet supported ... */
+			if(pstate->header_line)
+				elog(ERROR, "header line in custom format is not yet supported");
+		}
 
-			if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+		if (formatter->fmt_databuf.len > 0 || !pstate->fe_eof)
+		{
+			/* while there is still data in our buffer */
+			while (!pstate->raw_buf_done)
 			{
-				/* while there is still data in our buffer */
-				while (!pstate->raw_buf_done)
+				bool	error_caught = false;
+
+				/*
+				 * Invoke the custom formatter function.
+				 */
+				PG_TRY();
 				{
-					bool	error_caught = false;
+					Datum					d;
+					FunctionCallInfoData	fcinfo;
+
+					/* per call formatter prep */
+					FunctionCallPrepareFormatter(&fcinfo,
+												 0,
+												 pstate,
+												 formatter,
+												 scan->fs_rd,
+												 scan->fs_tupDesc,
+												 scan->in_functions,
+												 scan->typioparams,
+												 ((URL_FILE *)(scan->fs_file))->url,
+												 ss);
+					d = FunctionCallInvoke(&fcinfo);
+
+				}
+				PG_CATCH();
+				{
+					error_caught = true;
+
+					MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
 
 					/*
-					 * Invoke the custom formatter function.
+					 * Save any bad row information that was set
+					 * by the user in the formatter UDF (if any).
+					 * Then handle the error in FILEAM_HANDLE_ERROR.
 					 */
-					PG_TRY();
-					{
-						Datum					d;
-						FunctionCallInfoData	fcinfo;
-
-						/* per call formatter prep */
-						FunctionCallPrepareFormatter(&fcinfo,
-													 0,
-													 pstate,
-													 formatter,
-													 scan->fs_rd,
-													 scan->fs_tupDesc,
-													 scan->in_functions,
-													 scan->typioparams);
-						d = FunctionCallInvoke(&fcinfo);
+					pstate->cur_lineno = formatter->fmt_badrow_num;
+					pstate->cur_byteno = formatter->fmt_bytesread;
+					resetStringInfo(&pstate->line_buf);
 
-					}
-					PG_CATCH();
+					if (formatter->fmt_badrow_len > 0)
 					{
-						error_caught = true;
-
-						MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
-
-						/*
-						 * Save any bad row information that was set
-						 * by the user in the formatter UDF (if any).
-						 * Then handle the error in FILEAM_HANDLE_ERROR.
-						 */
-						pstate->cur_lineno = formatter->fmt_badrow_num;
-						pstate->cur_byteno = formatter->fmt_bytesread;
-						resetStringInfo(&pstate->line_buf);
-
-						if (formatter->fmt_badrow_len > 0)
+						if (formatter->fmt_badrow_data)
+							appendBinaryStringInfo(&pstate->line_buf,
+												   formatter->fmt_badrow_data,
+												   formatter->fmt_badrow_len);
+
+						formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
+						if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
+							formatter->fmt_databuf.cursor < 0 )
 						{
-							if (formatter->fmt_badrow_data)
-								appendBinaryStringInfo(&pstate->line_buf,
-													   formatter->fmt_badrow_data,
-													   formatter->fmt_badrow_len);
-
-							formatter->fmt_databuf.cursor += formatter->fmt_badrow_len;
-							if (formatter->fmt_databuf.cursor > formatter->fmt_databuf.len ||
-								formatter->fmt_databuf.cursor < 0 )
-							{
-								formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
-							}
+							formatter->fmt_databuf.cursor = formatter->fmt_databuf.len;
 						}
-
-						FILEAM_HANDLE_ERROR;
-
-						MemoryContextSwitchTo(oldctxt);
 					}
-					PG_END_TRY();
 
-					/*
-					 * Examine the function results. If an error was caught
-					 * we already handled it, so after checking the reject
-					 * limit, loop again and call the UDF for the next tuple.
-					 */
-					if (!error_caught)
+					FILEAM_HANDLE_ERROR;
+
+					MemoryContextSwitchTo(oldctxt);
+				}
+				PG_END_TRY();
+
+				/*
+				 * Examine the function results. If an error was caught
+				 * we already handled it, so after checking the reject
+				 * limit, loop again and call the UDF for the next tuple.
+				 */
+				if (!error_caught)
+				{
+					switch (formatter->fmt_notification)
 					{
-						switch (formatter->fmt_notification)
-						{
-							case FMT_NONE:
+						case FMT_NONE:
 
-								/* got a tuple back */
+							/* got a tuple back */
 
-								tuple = formatter->fmt_tuple;
-								pstate->processed++;
-								MemoryContextReset(formatter->fmt_perrow_ctx);
+							tuple = formatter->fmt_tuple;
+							pstate->processed++;
+							MemoryContextReset(formatter->fmt_perrow_ctx);
 
-								return tuple;
+							return tuple;
 
-							case FMT_NEED_MORE_DATA:
+						case FMT_NEED_MORE_DATA:
 
-								/*
-								 * Callee consumed all data in the buffer.
-								 * Prepare to read more data into it.
-								 */
-								pstate->raw_buf_done = true;
-								justifyDatabuf(&formatter->fmt_databuf);
+							/*
+							 * Callee consumed all data in the buffer.
+							 * Prepare to read more data into it.
+							 */
+							pstate->raw_buf_done = true;
+							justifyDatabuf(&formatter->fmt_databuf);
 
-								continue;
+							continue;
 
-							default:
-								elog(ERROR, "unsupported formatter notification (%d)",
-											formatter->fmt_notification);
-								break;
-						}
-					}
-					else
-					{
-						FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+						default:
+							elog(ERROR, "unsupported formatter notification (%d)",
+										formatter->fmt_notification);
+							break;
 					}
-
 				}
-			}
-			else
-			{
-				no_more_data = true;
+				else
+				{
+					FILEAM_IF_REJECT_LIMIT_REACHED_ABORT
+				}
 			}
 		}
+		else
+		{
+			no_more_data = true;
+		}
+	}
+
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	Assert(no_more_data);
+	scan->fs_inited = false;
+
+	return NULL;
+}
+
+static HeapTuple
+externalgettup_custom_noextprot(FileScanDesc scan,
+								ExternalSelectDesc desc,
+								ScanState *ss)
+{
+	HeapTuple   	tuple;
+	CopyState		pstate = scan->fs_pstate;
+	FormatterData*	formatter = scan->fs_formatter;
+	bool			no_more_data = false;
+	MemoryContext 	oldctxt = CurrentMemoryContext;
+
+	Assert(formatter);
+
+	/* while didn't finish processing the entire file */
+	while (!no_more_data)
+	{
+		bool error_caught = false;
 
 		/*
-		 * if we got here we finished reading all the data.
+		 * Invoke the custom formatter function.
 		 */
-		Assert(no_more_data);
-		scan->fs_inited = false;
+		PG_TRY();
+		{
+			Datum					d;
+			FunctionCallInfoData	fcinfo;
+
+			/* per call formatter prep */
+			FunctionCallPrepareFormatter(&fcinfo,
+										 0,
+										 pstate,
+										 formatter,
+										 scan->fs_rd,
+										 scan->fs_tupDesc,
+										 scan->in_functions,
+										 scan->typioparams,
+										 scan->fs_uri,
+										 ss);
+			d = FunctionCallInvoke(&fcinfo);
 
-		return NULL;
+		}
+		PG_CATCH();
+		{
+			error_caught = true;
+			MemoryContextSwitchTo(formatter->fmt_perrow_ctx);
+			FILEAM_HANDLE_ERROR;
+			MemoryContextSwitchTo(oldctxt);
+		}
+		PG_END_TRY();
+
+		if (!error_caught)
+		{
+			switch (formatter->fmt_notification)
+			{
+				case FMT_NONE:
+				{
+					/* got a tuple back */
+					tuple = formatter->fmt_tuple;
+					pstate->processed++;
+					MemoryContextReset(formatter->fmt_perrow_ctx);
+					return tuple;
+				}
+				case FMT_DONE:
+				{
+					no_more_data = true;
+					break;
+				}
+				default:
+				{
+					elog(ERROR, "unsupported formatter notification (%d)",
+								formatter->fmt_notification);
+					break;
+				}
+			}
+		}
+		else
+		{
+			ereport(ERROR,
+					(errcode(ERRCODE_EXTERNAL_ROUTINE_INVOCATION_EXCEPTION),
+						(errmsg("formatter reported error")),
+						errOmitLocation(true)));
+		}
+	}
+
+	/*
+	 * if we got here we finished reading all the data.
+	 */
+	Assert(no_more_data);
+	scan->fs_inited = false;
+	return NULL;
 }
 
 /* ----------------
@@ -1322,11 +1518,19 @@ externalgettup(FileScanDesc scan,
 		/* (set current state...) */
 	}
 
+	/***********************************************************
+	 * This version has always custom formatter and fs defined.
+	 ***********************************************************/
 	if (!custom)
-		return externalgettup_defined(scan, desc, ss); /* text/csv */
+		return externalgettup_defined(scan, desc, ss); // text/csv
+	else if (scan->fs_formatter->fmt_mask & FMT_NEEDEXTBUFF)
+	{
+		return externalgettup_custom(scan, desc, ss);
+	}
 	else
-		return externalgettup_custom(scan, desc, ss);  /* custom   */
-
+	{
+		return externalgettup_custom_noextprot(scan, desc, ss);
+	}
 }
 /*
  * setCustomFormatter
@@ -1346,7 +1550,23 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
 		Oid		argList[1];
 		Oid		returnOid;
 
-		funcname = lappend(funcname, makeString(formatter_name));
+		char*   new_formatter_name = (char *)palloc0(strlen(formatter_name) + 5);
+		if (!iswritable)
+		{
+			sprintf(new_formatter_name, "%s_in", formatter_name);
+		}
+		else
+		{
+			sprintf(new_formatter_name, "%s_out", formatter_name);
+		}
+
+		/* update to all lowercase string */
+		for ( int i = 0 ; new_formatter_name[i] != '\0' ; ++i )
+		{
+			new_formatter_name[i] = tolower(new_formatter_name[i]);
+		}
+
+		funcname = lappend(funcname, makeString(new_formatter_name));
 
 		if(iswritable)
 		{
@@ -1363,7 +1583,7 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
 		if (!OidIsValid(procOid))
 			ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
 							errmsg("formatter function %s of type %s was not found.",
-									formatter_name,
+									new_formatter_name,
 									(iswritable ? "writable" : "readable")),
 							errhint("Create it with CREATE FUNCTION."),
 							errOmitLocation(true)));
@@ -1372,7 +1592,7 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
 		if (get_func_rettype(procOid) != returnOid)
 			ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
 							errmsg("formatter function %s of type %s has an incorrect return type",
-									formatter_name,
+									new_formatter_name,
 									(iswritable ? "writable" : "readable")),
 							errOmitLocation(true)));
 
@@ -1381,9 +1601,12 @@ lookupCustomFormatter(char *formatter_name, bool iswritable)
 			ereport(ERROR,
 					(errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
 					 errmsg("formatter function %s is not declared STABLE.",
-							 formatter_name),
+							 new_formatter_name),
 					 errOmitLocation(true)));
 
+		if(NULL != new_formatter_name)
+			pfree(new_formatter_name);
+
 		return procOid;
 }
 
@@ -1664,7 +1887,9 @@ FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
 							 Relation 				rel,
 							 TupleDesc 				tupDesc,
 							 FmgrInfo			   *convFuncs,
-							 Oid                   *typioparams)
+							 Oid                   *typioparams,
+							 char				   *url,
+							 ScanState			   *ss)
 {
 	formatter->type = T_FormatterData;
 	formatter->fmt_relation = rel;
@@ -1680,6 +1905,8 @@ FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
 	formatter->fmt_needs_transcoding = pstate->need_transcoding;
 	formatter->fmt_conversion_proc = pstate->enc_conversion_proc;
 	formatter->fmt_external_encoding = pstate->client_encoding;
+	formatter->fmt_url = url;
+	formatter->fmt_splits = ss == NULL ? NULL : ss->splits;
 
 	InitFunctionCallInfoData(/* FunctionCallInfoData */ *fcinfo,
 							 /* FmgrInfo */ pstate->custom_formatter_func,
@@ -1688,7 +1915,6 @@ FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
 							 /* ResultSetInfo */ NULL);
 }
 
-
 /*
  * open the external source for scanning (RET only)
  *
@@ -2360,6 +2586,48 @@ strtokx2(const char *s,
 	return start;
 }
 
+bool hasErrTblInFmtOpts(List *fmtOpts) {
+	char	*format_str = pstrdup((char *) strVal(linitial(fmtOpts)));
+	const	char *whitespace = " \t\n\r";
+	int	encoding = GetDatabaseEncoding();
+	char *key = strtokx2(format_str, whitespace, NULL, NULL,
+												0, false, true, encoding);
+	while (key) {
+		if (pg_strcasecmp(key, "err_table") == 0)
+			return true;
+		key = strtokx2(NULL, whitespace, NULL, NULL,
+									   0, false, false, encoding);
+	}
+	return false;
+}
+
+char *getExtTblCategoryInFmtOptsStr(char *fmtStr)
+{
+	const char	*whitespace = " \t\n\r";
+	const char	*quote = "'";
+	int			encoding = GetDatabaseEncoding();
+
+	char *key = strtokx2(fmtStr, whitespace, NULL, NULL,
+	                     0, false, true, encoding);
+	char *val = strtokx2(NULL, whitespace, NULL, quote,
+	                     0, false, true, encoding);
+
+	while (key && val)
+	{
+		if (pg_strncasecmp(key, "category", strlen("category")) == 0)
+		{
+			return pstrdup(val);
+		}
+
+		key = strtokx2(NULL, whitespace, NULL, NULL,
+		               0, false, false, encoding);
+		val = strtokx2(NULL, whitespace, NULL, quote,
+		               0, false, true, encoding);
+	}
+
+	return NULL;
+}
+
 char *getExtTblFormatterTypeInFmtOptsStr(char *fmtStr)
 {
 	const char	*whitespace = " \t\n\r";

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/access/external/plugstorage.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/plugstorage.c b/src/backend/access/external/plugstorage.c
index ad7d260..fe7c82a 100644
--- a/src/backend/access/external/plugstorage.c
+++ b/src/backend/access/external/plugstorage.c
@@ -479,24 +479,33 @@ void InvokePlugStorageFormatStopScan(FmgrInfo *func,
 ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
 		                                             Relation relation,
                                                      int formatterType,
-                                                     char *formatterName)
+                                                     char *formatterName,
+													PlannedStmt *plannedstmt,
+													int segno )
 {
 	PlugStorageData psdata;
 	FunctionCallInfoData fcinfo;
 
-	psdata.type               = T_PlugStorageData;
-	psdata.ps_relation        = relation;
-	psdata.ps_formatter_type  = formatterType;
-	psdata.ps_formatter_name  = formatterName;
+	psdata.type = T_PlugStorageData;
+	psdata.ps_relation = relation;
+	psdata.ps_formatter_type = formatterType;
+	psdata.ps_formatter_name = formatterName;
+	psdata.ps_segno = segno;
 
-	InitFunctionCallInfoData(fcinfo,
-	                         func,
-	                         0,
-	                         (Node *)(&psdata),
-	                         NULL);
 
+	psdata.ps_scan_state = palloc0(sizeof(ScanState));
+
+	InitFunctionCallInfoData(fcinfo,  // FunctionCallInfoData
+	                         func,    // FmgrInfo
+	                         0,       // nArgs
+	                         (Node *)(&psdata),  // Call Context
+	                         NULL);              // ResultSetInfo
+
+	// Invoke function
 	FunctionCallInvoke(&fcinfo);
 
+
+	// We do not expect a null result
 	if (fcinfo.isnull)
 	{
 		elog(ERROR, "function %u returned NULL",
@@ -505,6 +514,7 @@ ExternalInsertDesc InvokePlugStorageFormatInsertInit(FmgrInfo *func,
 
 	ExternalInsertDesc extInsertDesc = psdata.ps_ext_insert_desc;
 
+	pfree(psdata.ps_scan_state);
 	return extInsertDesc;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/catalog/cdb_external_extensions.sql
----------------------------------------------------------------------
diff --git a/src/backend/catalog/cdb_external_extensions.sql b/src/backend/catalog/cdb_external_extensions.sql
index e3a8080..d11797f 100644
--- a/src/backend/catalog/cdb_external_extensions.sql
+++ b/src/backend/catalog/cdb_external_extensions.sql
@@ -47,3 +47,15 @@ LANGUAGE C STABLE;
 CREATE OR REPLACE FUNCTION fixedwidth_out(record) RETURNS bytea 
 AS '$libdir/fixedwidth.so', 'fixedwidth_out'
 LANGUAGE C STABLE;
+
+------------------------------------------------------------------
+-- external HDFS
+------------------------------------------------------------------
+CREATE OR REPLACE FUNCTION hdfs_validate() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_validate'
+LANGUAGE C STABLE;
+
+CREATE OR REPLACE FUNCTION hdfs_blocklocation() RETURNS void
+AS '$libdir/exthdfs.so', 'hdfsprotocol_blocklocation'
+LANGUAGE C STABLE;
+

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/catalog/heap.c
----------------------------------------------------------------------
diff --git a/src/backend/catalog/heap.c b/src/backend/catalog/heap.c
index 18a8acf..e159c83 100644
--- a/src/backend/catalog/heap.c
+++ b/src/backend/catalog/heap.c
@@ -47,7 +47,11 @@
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
+#include "fmgr.h"
+#include "miscadmin.h"
+#include "port.h"
 
+#include "access/fileam.h"
 #include "access/genam.h"
 #include "access/heapam.h"
 #include "access/sysattr.h"
@@ -78,38 +82,42 @@
 #include "catalog/pg_statistic.h"
 #include "catalog/pg_tablespace.h"
 #include "catalog/pg_type.h"
-#include "cdb/cdbappendonlyam.h"
-#include "cdb/cdbpartition.h"
+
 #include "cdb/cdbanalyze.h"
+#include "cdb/cdbappendonlyam.h"
+#include "cdb/cdbmirroredfilesysobj.h"
 #include "cdb/cdbparquetfooterprocessor.h"
+#include "cdb/cdbparquetstoragewrite.h"
+#include "cdb/cdbpartition.h"
+#include "cdb/cdbpersistentfilesysobj.h"
+#include "cdb/cdbsharedstorageop.h"
+#include "cdb/cdbvars.h"
 #include "commands/dbcommands.h"
 #include "commands/tablecmds.h"
 #include "commands/tablespace.h"
-#include "miscadmin.h"
+
 #include "nodes/makefuncs.h"
+#include "nodes/pg_list.h"
+#include "nodes/value.h"
 #include "optimizer/clauses.h"
 #include "optimizer/var.h"
 #include "parser/parse_coerce.h"
 #include "parser/parse_expr.h"
 #include "parser/parse_relation.h"
+#include "pg_config_manual.h"
+#include "storage/fd.h"
 #include "storage/smgr.h"
+#include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/fmgroids.h"
+#include "utils/guc.h"
 #include "utils/inval.h"
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"             /* CDB: GetMemoryChunkContext */
+#include "utils/palloc.h"
 #include "utils/relcache.h"
 #include "utils/syscache.h"
-#include "utils/guc.h"
-#include "cdb/cdbvars.h"
-
-#include "cdb/cdbsharedstorageop.h"
-#include "cdb/cdbmirroredfilesysobj.h"
-#include "cdb/cdbpersistentfilesysobj.h"
-#include "cdb/cdbparquetstoragewrite.h"
-#include "catalog/gp_persistent.h"
-
-#include "utils/guc.h"
+#include "utils/uri.h"
 
 typedef struct pg_result PGresult;
 extern void PQclear(PGresult *res);
@@ -2544,7 +2552,52 @@ heap_drop_with_catalog(Oid relid)
 	 * External table? If so, delete the pg_exttable tuple.
 	 */
 	if (is_external_rel)
+	{
+		/* Step 1. remove uri on file system */
+		rel = relation_open(relid, AccessExclusiveLock);
+		ExtTableEntry *exttbl = GetExtTableEntry(rel->rd_id);
+		char *path = (char *) strVal(linitial(exttbl->locations));
+		char *searchKey = (char *) palloc0 (MAXPGPATH);
+		char *fileSpacePath = NULL;
+		GetFilespacePathForTablespace(get_database_dts(MyDatabaseId),
+			                          &fileSpacePath);
+		sprintf(searchKey, "%s/ExtErrTbl/",fileSpacePath);
+		char *match = strstr(path,searchKey);
+		if (match)
+		{
+			RemovePath(path, 1);
+		}
+
+		/* Get category for the external table */
+		List *entry_locations = exttbl->locations;
+		Assert(entry_locations);
+		ListCell *entry_location = list_head(entry_locations);
+		char *url = ((Value*)lfirst(entry_location))->val.str;
+		char *category = getExtTblCategoryInFmtOptsStr(exttbl->fmtopts);
+
+		/* Remove data for internal table */
+		if (category != NULL &&
+		    pg_strncasecmp(category, "internal", strlen("internal")) == 0)
+		{
+
+
+			if (IS_HDFS_URI(url))   /* ORC, TEXT, CSV */
+			{
+				// orc, text, csv only support one location.
+				Assert(list_length(entry_locations) == 1);
+				RemovePath(url, 1);
+			}
+		}
+
+		if (category)
+		{
+			pfree(category);
+		}
+		relation_close(rel, AccessExclusiveLock);
+
+		/* Step 2. remove pg_exttable entry */
 		RemoveExtTableEntry(relid);
+	}
 
 	if (is_foreign_rel)
 		RemoveForeignTableEntry(relid);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/cdb/cdbdatalocality.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index b451f68..f7d4472 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -29,14 +29,17 @@
 
 #include "access/genam.h"
 #include "access/aomd.h"
+#include "access/extprotocol.h"
 #include "access/heapam.h"
 #include "access/filesplit.h"
 #include "access/parquetsegfiles.h"
+#include "access/xact.h"
 #include "catalog/catalog.h"
 #include "catalog/catquery.h"
 #include "catalog/pg_exttable.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_proc.h"
+#include "catalog/pg_extprotocol.h"
 #include "cdb/cdbdatalocality.h"
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
@@ -52,6 +55,7 @@
 #include "optimizer/planmain.h"
 #include "parser/parsetree.h"
 #include "storage/fd.h"
+#include "parser/parse_func.h"
 #include "postmaster/identity.h"
 #include "cdb/cdbmetadatacache.h"
 #include "resourcemanager/utils/network_utils.h"
@@ -87,6 +91,11 @@ typedef struct range_table_collector_context {
 	List *full_range_tables; /* every table include result relation  */
 } range_table_collector_context;
 
+typedef struct collect_scan_rangetable_context {
+	plan_tree_base_prefix base;
+	List *range_tables; // range table for scan only
+	List *full_range_tables;  // full range table
+} collect_scan_rangetable_context;
 /*
  * structure containing information about how much a
  * host holds.
@@ -123,10 +132,14 @@ typedef struct File_Split {
 	int64 logiceof;
 	int host;
 	bool is_local_read;
+	char *ext_file_uri;
 } File_Split;
 
 typedef enum DATALOCALITY_RELATION_TYPE {
-	DATALOCALITY_APPENDONLY, DATALOCALITY_PARQUET, DATALOCALITY_UNKNOWN
+	DATALOCALITY_APPENDONLY,
+	DATALOCALITY_PARQUET,
+	DATALOCALITY_HDFS,
+	DATALOCALITY_UNKNOWN
 } DATALOCALITY_RELATION_TYPE;
 
 /*
@@ -290,6 +303,7 @@ typedef struct hostname_volume_stat_context {
  */
 typedef struct split_to_segment_mapping_context {
 	range_table_collector_context rtc_context;
+	collect_scan_rangetable_context srtc_context;
 	data_dist_stat_context dds_context;
 	collect_hdfs_split_location_context chsl_context;
 	hostname_volume_stat_context host_context;
@@ -315,9 +329,9 @@ typedef struct split_to_segment_mapping_context {
 
 	int64 total_metadata_logic_len;
 
-    int metadata_cache_time_us;
-    int alloc_resource_time_us;
-    int cal_datalocality_time_us;
+	int metadata_cache_time_us;
+	int alloc_resource_time_us;
+	int cal_datalocality_time_us;
 } split_to_segment_mapping_context;
 
 typedef struct vseg_list{
@@ -331,14 +345,19 @@ static void init_datalocality_memory_context(void);
 static void init_split_assignment_result(Split_Assignment_Result *result,
 		int host_num);
 
-static void init_datalocality_context(split_to_segment_mapping_context *context);
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+		split_to_segment_mapping_context *context);
 
 static bool range_table_collector_walker(Node *node,
 		range_table_collector_context *context);
 
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
 		range_table_collector_context *context);
 
+static bool collect_scan_rangetable(Node *node,
+		collect_scan_rangetable_context *cxt);
+
+
 static void convert_range_tables_to_oids_and_check_table_functions(List **range_tables, bool* isUDFExists,
 		MemoryContext my_memorycontext);
 
@@ -349,6 +368,8 @@ static void check_keep_hash_and_external_table(
 static int64 get_block_locations_and_claculte_table_size(
 		split_to_segment_mapping_context *collector_context);
 
+static bool dataStoredInHdfs(Relation rel);
+
 static List *get_virtual_segments(QueryResource *resource);
 
 static List *run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, QueryResource ** resourcePtr,
@@ -368,6 +389,12 @@ static void ParquetGetSegFileDataLocation(Relation relation,
 		Relation_Data *rel_data, int* hitblocks,
 		int* allblocks, GpPolicy *targetPolicy);
 
+static void ExternalGetHdfsFileDataLocation(Relation relation,
+		split_to_segment_mapping_context *context, int64 splitsize,
+		Relation_Data *rel_data, int* allblocks);
+
+Oid LookupCustomProtocolBlockLocationFunc(char *protoname);
+
 static BlockLocation *fetch_hdfs_data_block_location(char *filepath, int64 len,
 		int *block_num, RelFileNode rnode, uint32_t segno, double* hit_ratio);
 
@@ -464,6 +491,9 @@ static Relation_File** change_file_order_based_on_continuity(
 static int64 set_maximum_segment_volume_parameter(Relation_Data *rel_data,
 		int host_num, double* maxSizePerSegment);
 
+static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
+                                            List  *locs,
+                                            List **blockLocations);
 /*
  * Setup /cleanup the memory context for this run
  * of data locality algorithm.
@@ -500,13 +530,17 @@ static void init_split_assignment_result(Split_Assignment_Result *result,
 	return;
 }
 
-static void init_datalocality_context(split_to_segment_mapping_context *context) {
+static void init_datalocality_context(PlannedStmt *plannedstmt,
+		split_to_segment_mapping_context *context) {
 	context->old_memorycontext = CurrentMemoryContext;
 	context->datalocality_memorycontext = DataLocalityMemoryContext;
 
 	context->chsl_context.relations = NIL;
 	context->rtc_context.range_tables = NIL;
-	context->rtc_context.full_range_tables = NIL;
+	context->rtc_context.full_range_tables = plannedstmt->rtable;
+	context->srtc_context.range_tables = NIL;
+	context->srtc_context.full_range_tables = plannedstmt->rtable;
+	context->srtc_context.base.node = (Node *)plannedstmt;
 
 	context->externTableForceSegNum = 0;
 	context->externTableLocationSegNum = 0;
@@ -592,7 +626,7 @@ static bool range_table_collector_walker(Node *node,
  * collect_range_tables: collect all range table relations, and store
  * them into the range_table_collector_context.
  */
-static void collect_range_tables(Query *query, List* full_range_table,
+static void collect_range_tables(Query *query,
 		range_table_collector_context *context) {
 
 	query_tree_walker(query, range_table_collector_walker, (void *) context,
@@ -613,9 +647,27 @@ static void collect_range_tables(Query *query, List* full_range_table,
 		}
 		context->range_tables = new_range_tables;
 	}
-	context->full_range_tables = full_range_table;
 	return;
 }
+
+bool collect_scan_rangetable(Node *node,
+		collect_scan_rangetable_context *cxt) {
+	if (NULL == node) return false;
+
+	switch (nodeTag(node)) {
+	case T_ExternalScan:
+	case T_AppendOnlyScan:
+	case T_ParquetScan: {
+		RangeTblEntry  *rte = rt_fetch(((Scan *)node)->scanrelid,
+											   cxt->full_range_tables);
+		cxt->range_tables = lappend(cxt->range_tables, rte);
+	}
+	default:
+		break;
+	}
+
+	return plan_tree_walker(node, collect_scan_rangetable, cxt);
+}
 /*
  *
  */
@@ -750,6 +802,19 @@ static void check_keep_hash_and_external_table(
 					if (uri && uri->protocol == URI_CUSTOM && is_pxf_protocol(uri)) {
 						isPxf = true;
 					}
+					else if (uri && (is_hdfs_protocol(uri))) {
+						relation_close(rel, AccessShareLock);
+						if (targetPolicy->nattrs > 0)
+						{
+							/*select the maximum hash bucket number as hashSegNum of query*/
+							if (context->hashSegNum < targetPolicy->bucketnum)
+							{
+								context->hashSegNum = targetPolicy->bucketnum;
+								context->keep_hash = true;
+							}
+						}
+						continue;
+					}
 				}
 			}
 			if (extEnrty->command || isPxf) {
@@ -844,37 +909,14 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
 		/*
 		 * We only consider the data stored in HDFS.
 		 */
-		if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
-			Relation_Data *rel_data = NULL;
-			/*
-			 * Get pg_appendonly information for this table.
-			 */
-			AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
-
-			rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
+		bool isDataStoredInHdfs = dataStoredInHdfs(rel);
+		if (isDataStoredInHdfs ) {
+			GpPolicy *targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
+			Relation_Data *rel_data = (Relation_Data *) palloc(sizeof(Relation_Data));
 			rel_data->relid = rel_oid;
 			rel_data->files = NIL;
 			rel_data->partition_parent_relid = 0;
 			rel_data->block_count = 0;
-
-			GpPolicy *targetPolicy = NULL;
-			targetPolicy = GpPolicyFetch(CurrentMemoryContext, rel_oid);
-			/*
-			 * Based on the pg_appendonly information, calculate the data
-			 * location information associated with this relation.
-			 */
-			if (RelationIsAoRows(rel)) {
-				rel_data->type = DATALOCALITY_APPENDONLY;
-				AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
-						aoEntry->splitsize, rel_data, &hitblocks,
-						&allblocks, targetPolicy);
-			} else {
-				rel_data->type = DATALOCALITY_PARQUET;
-				ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
-						context->split_size, rel_data, &hitblocks,
-						&allblocks, targetPolicy);
-			}
-
 			bool isResultRelation = true;
 			ListCell *nonResultlc;
 			foreach(nonResultlc, context->rtc_context.range_tables)
@@ -884,6 +926,54 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
 					isResultRelation = false;
 				}
 			}
+
+			if (!isResultRelation) {
+				// skip the relation not in scan nodes
+				// for partition table scan optimization;
+				ListCell *rtc;
+				bool found = false;
+				foreach(rtc, context->srtc_context.range_tables) {
+					RangeTblEntry *rte = lfirst(rtc);
+					if (rel_oid == rte->relid) {
+						found = true;
+						break;
+					}
+				}
+				if (!found) {
+					relation_close(rel, AccessShareLock);
+					continue;
+				}
+			}
+
+			if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+				/*
+				 * Get pg_appendonly information for this table.
+				 */
+				AppendOnlyEntry *aoEntry = GetAppendOnlyEntry(rel_oid, SnapshotNow);
+				/*
+				 * Based on the pg_appendonly information, calculate the data
+				 * location information associated with this relation.
+				 */
+				if (RelationIsAoRows(rel)) {
+					rel_data->type = DATALOCALITY_APPENDONLY;
+					AOGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+							aoEntry->splitsize, rel_data, &hitblocks,
+							&allblocks, targetPolicy);
+				} else {
+					rel_data->type = DATALOCALITY_PARQUET;
+					ParquetGetSegFileDataLocation(rel, aoEntry, ActiveSnapshot, context,
+							context->split_size, rel_data, &hitblocks,
+							&allblocks, targetPolicy);
+				}
+			} else if (RelationIsExternal(rel)) {
+				if (isDataStoredInHdfs) {
+					// we can't use metadata cache, so hitblocks will always be 0
+					rel_data->type = DATALOCALITY_HDFS;
+					ExternalGetHdfsFileDataLocation(rel, context, context->split_size,
+					                                rel_data, &allblocks);
+				}
+			}
+
 			if (!isResultRelation) {
 				total_size += rel_data->total_size;
 				totalFileCount += list_length(rel_data->files);
@@ -915,8 +1005,7 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
 	}
 	context->total_file_count = totalFileCount;
 	context->total_size = total_size;
-    
-    context->metadata_cache_time_us = eclaspeTime;
+	context->metadata_cache_time_us = eclaspeTime;
 
 	if(debug_datalocality_time){
 		elog(LOG, "metadata overall execution time: %d us. \n", eclaspeTime);
@@ -924,6 +1013,25 @@ int64 get_block_locations_and_claculte_table_size(split_to_segment_mapping_conte
 	return total_size;
 }
 
+bool dataStoredInHdfs(Relation rel) {
+	if (RelationIsAoRows(rel) || RelationIsParquet(rel)) {
+		return true;
+	} else if (RelationIsExternal(rel)) {
+		ExtTableEntry* extEnrty = GetExtTableEntry(rel->rd_id);
+		bool isHdfsProtocol = false;
+		if (!extEnrty->command && extEnrty->locations) {
+			char* first_uri_str = (char *) strVal(lfirst(list_head(extEnrty->locations)));
+			if (first_uri_str) {
+				Uri* uri = ParseExternalTableUri(first_uri_str);
+				if (uri && is_hdfs_protocol(uri)) {
+					isHdfsProtocol = true;
+				}
+			}
+		}
+		return isHdfsProtocol;
+	}
+	return false;
+}
 /*
  * search_host_in_stat_context: search a host name in the statistic
  * context; if not found, create a new one.
@@ -1579,7 +1687,237 @@ static void ParquetGetSegFileDataLocation(Relation relation,
 	return;
 }
 
+static void InvokeHDFSProtocolBlockLocation(Oid    procOid,
+                                            List  *locs,
+                                            List **blockLocations)
+{
+	ExtProtocolValidatorData   *validator_data;
+	FmgrInfo				   *validator_udf;
+	FunctionCallInfoData		fcinfo;
+
+	validator_data = (ExtProtocolValidatorData *)
+					 palloc0 (sizeof(ExtProtocolValidatorData));
+	validator_udf = palloc(sizeof(FmgrInfo));
+	fmgr_info(procOid, validator_udf);
+
+	validator_data->type 		= T_ExtProtocolValidatorData;
+	validator_data->url_list 	= locs;
+	validator_data->format_opts = NULL;
+	validator_data->errmsg		= NULL;
+	validator_data->direction 	= EXT_VALIDATE_READ;
+	validator_data->action		= EXT_VALID_ACT_GETBLKLOC;
+
+	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
+							 /* FmgrInfo */ validator_udf,
+							 /* nArgs */ 0,
+							 /* Call Context */ (Node *) validator_data,
+							 /* ResultSetInfo */ NULL);
+
+	/* invoke validator. if this function returns - validation passed */
+	FunctionCallInvoke(&fcinfo);
+
+	ExtProtocolBlockLocationData *bls =
+		(ExtProtocolBlockLocationData *)(fcinfo.resultinfo);
+	/* debug output block location. */
+	if (bls != NULL)
+	{
+		ListCell *c;
+		int i = 0 ,j = 0;
+		foreach(c, bls->files)
+		{
+			blocklocation_file *blf = (blocklocation_file *)(lfirst(c));
+			elog(DEBUG3, "DEBUG LOCATION for %s with %d blocks",
+					     blf->file_uri, blf->block_num);
+			for ( i = 0 ; i < blf->block_num ; ++i )
+			{
+				BlockLocation *pbl = &(blf->locations[i]);
+				elog(DEBUG3, "DEBUG LOCATION for block %d : %d, "
+						     INT64_FORMAT ", " INT64_FORMAT ", %d",
+						     i,
+						     pbl->corrupt, pbl->length, pbl->offset,
+							 pbl->numOfNodes);
+				for ( j = 0 ; j < pbl->numOfNodes ; ++j )
+				{
+					elog(DEBUG3, "DEBUG LOCATION for block %d : %s, %s, %s",
+							     i,
+							     pbl->hosts[j], pbl->names[j],
+								 pbl->topologyPaths[j]);
+				}
+			}
+		}
+	}
 
+	elog(DEBUG3, "after invoking get block location API");
+
+	/* get location data from fcinfo.resultinfo. */
+	if (bls != NULL)
+	{
+		Assert(bls->type == T_ExtProtocolBlockLocationData);
+		while(list_length(bls->files) > 0)
+		{
+			void *v = lfirst(list_head(bls->files));
+			bls->files = list_delete_first(bls->files);
+			*blockLocations = lappend(*blockLocations, v);
+		}
+	}
+	pfree(validator_data);
+	pfree(validator_udf);
+}
+
+Oid
+LookupCustomProtocolBlockLocationFunc(char *protoname)
+{
+	List*	funcname 	= NIL;
+	Oid		procOid		= InvalidOid;
+	Oid		argList[1];
+	Oid		returnOid;
+
+	char*   new_func_name = (char *)palloc0(strlen(protoname) + 16);
+	sprintf(new_func_name, "%s_blocklocation", protoname);
+	funcname = lappend(funcname, makeString(new_func_name));
+	returnOid = VOIDOID;
+	procOid = LookupFuncName(funcname, 0, argList, true);
+
+	if (!OidIsValid(procOid))
+		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_FUNCTION),
+						errmsg("protocol function %s was not found.",
+								new_func_name),
+						errhint("Create it with CREATE FUNCTION."),
+						errOmitLocation(true)));
+
+	/* check return type matches */
+	if (get_func_rettype(procOid) != returnOid)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
+						errmsg("protocol function %s has an incorrect return type",
+								new_func_name),
+						errOmitLocation(true)));
+
+	/* check allowed volatility */
+	if (func_volatile(procOid) != PROVOLATILE_STABLE)
+		ereport(ERROR, (errcode(ERRCODE_INVALID_FUNCTION_DEFINITION),
+				 	 	errmsg("protocol function %s is not declared STABLE.",
+						new_func_name),
+						errOmitLocation(true)));
+	pfree(new_func_name);
+
+	return procOid;
+}
+
+static void ExternalGetHdfsFileDataLocation(
+				Relation relation,
+				split_to_segment_mapping_context *context,
+				int64 splitsize,
+				Relation_Data *rel_data,
+				int* allblocks) {
+	ExtTableEntry *ext_entry = GetExtTableEntry(rel_data->relid);
+	Assert(ext_entry->locations != NIL);
+	int64 total_size = 0;
+	int segno = 1;
+
+	/*
+	 * Step 1. get external HDFS location from URI.
+	 */
+	char* first_uri_str = (char *) strVal(lfirst(list_head(ext_entry->locations)));
+	/* We must have at least one location. */
+	Assert(first_uri_str != NULL);
+	Uri* uri = ParseExternalTableUri(first_uri_str);
+	bool isHdfs = false;
+	if (uri != NULL && is_hdfs_protocol(uri)) {
+		isHdfs = true;
+	}
+	Assert(isHdfs);  /* Currently, we accept HDFS only. */
+
+    /*
+     * Step 2. Get function to call for getting location information. This work
+     * is done by validator function registered for this external protocol.
+     */
+	Oid procOid = InvalidOid;
+	if (isHdfs) {
+		procOid = LookupCustomProtocolBlockLocationFunc("hdfs");
+	}
+	else
+	{
+		Assert(false);
+	}
+
+	/*
+	 * Step 3. Call validator to get location data.
+	 */
+
+	/* Prepare function call parameter by passing into location string. This is
+	 * only called at dispatcher side. */
+	List *bls = NULL; /* Block locations */
+	if (OidIsValid(procOid) && Gp_role == GP_ROLE_DISPATCH)
+	{
+		InvokeHDFSProtocolBlockLocation(procOid, ext_entry->locations, &bls);
+	}
+
+	/*
+	 * Step 4. Build data location info for optimization after this call.
+	 */
+
+	/* Go through each files */
+	ListCell *cbl = NULL;
+	foreach(cbl, bls)
+	{
+		blocklocation_file *f = (blocklocation_file *)lfirst(cbl);
+		BlockLocation *locations = f->locations;
+		int block_num = f->block_num;
+		int64 logic_len = 0;
+		*allblocks += block_num;
+		if ((locations != NULL) && (block_num > 0)) {
+			// calculate length for one specific file
+			for (int j = 0; j < block_num; ++j) {
+				logic_len += locations[j].length;
+		//		locations[j].lowerBoundInc = NULL;
+		//		locations[j].upperBoundExc = NULL;
+			}
+			total_size += logic_len;
+
+			Block_Host_Index * host_index = update_data_dist_stat(context,
+					locations, block_num);
+
+			Relation_File *file = (Relation_File *) palloc(sizeof(Relation_File));
+			if (atoi(strrchr(f->file_uri, '/') + 1) > 0)
+			  file->segno = atoi(strrchr(f->file_uri, '/') + 1);
+			else
+			  file->segno = segno++;
+			file->block_num = block_num;
+			file->locations = locations;
+			file->hostIDs = host_index;
+			file->logic_len = logic_len;
+
+			// do the split logic
+			int realSplitNum = 0;
+			int split_num = file->block_num;
+			int64 offset = 0;
+			File_Split *splits = (File_Split *) palloc(sizeof(File_Split) * split_num);
+			while (realSplitNum < split_num) {
+					splits[realSplitNum].host = -1;
+					splits[realSplitNum].is_local_read = true;
+					splits[realSplitNum].offset = offset;
+					splits[realSplitNum].length = file->locations[realSplitNum].length;
+					splits[realSplitNum].logiceof = logic_len;
+					splits[realSplitNum].ext_file_uri = pstrdup(f->file_uri);
+
+					if (logic_len - offset <= splits[realSplitNum].length) {
+							splits[realSplitNum].length = logic_len - offset;
+							++realSplitNum;
+							break;
+					}
+					offset += splits[realSplitNum].length;
+					++realSplitNum;
+			}
+			file->split_num = realSplitNum;
+			file->splits = splits;
+			context->total_split_count += realSplitNum;
+
+			rel_data->files = lappend(rel_data->files, file);
+		}
+	}
+	context->total_metadata_logic_len += total_size;
+	rel_data->total_size = total_size;
+}
 /*
  * step 1 search segments with local read and segment is not full after being assigned the block
  * step 2 search segments with local read and segment is not full before being assigned the block
@@ -4021,14 +4359,17 @@ void find_udf(Query *query, udf_collector_context *context) {
  * fixedVsegNum is used by PBE, since all the execute should use the same number of vsegs.
  */
 SplitAllocResult *
-calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
-		List *fullRangeTable, GpPolicy *intoPolicy, int sliceNum, int fixedVsegNum) {
+calculate_planner_segment_num(PlannedStmt *plannedstmt, Query *query,
+		QueryResourceLife resourceLife, int fixedVsegNum) {
 	SplitAllocResult *result = NULL;
 	QueryResource *resource = NULL;
-
 	List *virtual_segments = NIL;
 	List *alloc_result = NIL;
+	Node *planTree = plannedstmt->planTree;
+	GpPolicy *intoPolicy = plannedstmt->intoPolicy;
+	int sliceNum = plannedstmt->nMotionNodes + plannedstmt->nInitPlans + 1;
 	split_to_segment_mapping_context context;
+	context.chsl_context.relations = NIL;
 
 	int planner_segments = 0; /*virtual segments number for explain statement */
 
@@ -4061,9 +4402,11 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 	{
 		init_datalocality_memory_context();
 
-		init_datalocality_context(&context);
+		init_datalocality_context(plannedstmt, &context);
+
+		collect_range_tables(query, &(context.rtc_context));
 
-		collect_range_tables(query, fullRangeTable, &(context.rtc_context));
+		collect_scan_rangetable(planTree, &(context.srtc_context));
 
 		bool isTableFunctionExists = false;
 
@@ -4104,6 +4447,9 @@ calculate_planner_segment_num(Query *query, QueryResourceLife resourceLife,
 
 		/* get block location and calculate relation size*/
 		get_block_locations_and_claculte_table_size(&context);
+		if(context.chsl_context.relations){
+			Relation_Data* tmp = (Relation_Data*) lfirst(context.chsl_context.relations->tail);
+		}
 
 		/*use inherit resource*/
 		if (resourceLife == QRL_INHERIT) {

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/1c189fc1/src/backend/cdb/cdbpartition.c
----------------------------------------------------------------------
diff --git a/src/backend/cdb/cdbpartition.c b/src/backend/cdb/cdbpartition.c
index a887ee4..963983f 100644
--- a/src/backend/cdb/cdbpartition.c
+++ b/src/backend/cdb/cdbpartition.c
@@ -4787,9 +4787,9 @@ get_part_rule(Relation rel,
 static void
 fixup_table_storage_options(CreateStmt *ct)
 {
-	if (!ct->options)
+	if (!ct->base.options)
 	{
-    	ct->options = list_make2(makeDefElem("appendonly",
+    	ct->base.options = list_make2(makeDefElem("appendonly",
 											 (Node *)makeString("true")),
 								 makeDefElem("orientation",
 											 (Node *)makeString("column")));
@@ -5243,11 +5243,11 @@ atpxPartAddList(Relation rel,
 	ct = (CreateStmt *)linitial((List *)pUtl);
 	Assert(IsA(ct, CreateStmt));
 	
-	ct->is_add_part = true; /* subroutines need to know this */
+	ct->base.is_add_part = true; /* subroutines need to know this */
 	ct->ownerid = ownerid;
 	
-	if (!ct->distributedBy)
-		ct->distributedBy = make_dist_clause(rel);
+	if (!ct->base.distributedBy)
+		ct->base.distributedBy = make_dist_clause(rel);
 	
 	if (bSetTemplate)
 	/* if creating a template, silence partition name messages */
@@ -6377,7 +6377,7 @@ atpxPartAddList(Relation rel,
 	 * name of the parent relation
 	 */
 	
-	ct->relation = makeRangeVar(
+	ct->base.relation = makeRangeVar(
 								NULL /*catalogname*/, 
 								get_namespace_name(RelationGetNamespace(par_rel)),
 								RelationGetRelationName(par_rel), -1);
@@ -6429,7 +6429,7 @@ atpxPartAddList(Relation rel,
 		Oid 			 skipTableRelid	 	 = InvalidOid; 
 		List			*attr_encodings		 = NIL;
 		
-		ct->partitionBy = (Node *)pBy;
+		ct->base.partitionBy = (Node *)pBy;
 
 		/* this parse_analyze expands the phony create of a partitioned table
 		 * that we just build into the constituent commands we need to create
@@ -6517,7 +6517,7 @@ atpxPartAddList(Relation rel,
 						gs->objtype = ACL_OBJECT_RELATION;
 						gs->cooked_privs = cp;
 						
-						gs->objects = list_make1(copyObject(t->relation));
+						gs->objects = list_make1(copyObject(t->base.relation));
 						
 						pt = parse_analyze((Node *)gs, NULL, NULL, 0);
 						l1 = list_concat(l1, pt);
@@ -6545,7 +6545,7 @@ atpxPartAddList(Relation rel,
 			{
 				CreateStmt *t = (CreateStmt *)((Query *)s)->utilityStmt;
 				
-				skipTableRelid = RangeVarGetRelid(t->relation, true, false /*allowHcatalog*/);
+				skipTableRelid = RangeVarGetRelid(t->base.relation, true, false /*allowHcatalog*/);
 			}
 		}
 
@@ -8376,7 +8376,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
 	/* Caller should check this! */
 	Assert(stmt->partitionBy && !stmt->is_part_child);
 	
-	foreach( lc_elt, stmt->tableElts )
+	foreach( lc_elt, stmt->base.tableElts )
 	{
 		Node * elt = lfirst(lc_elt);
 		
@@ -8512,7 +8512,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
 				FkConstraint *fcon = list_nth(unnamed_cons, i);
 			
 				fcon->constr_name = 
-					ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+					ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
 														   colname,
 														   label,
 														   used_names);
@@ -8528,7 +8528,7 @@ fixCreateStmtForPartitionedTable(CreateStmt *stmt)
 				if ( 0 != strcmp(label, "pkey") )
 					colname = list_nth(unnamed_cons_col, i);
 				
-				con->name = ChooseConstraintNameForPartitionCreate(stmt->relation->relname,
+				con->name = ChooseConstraintNameForPartitionCreate(stmt->base.relation->relname,
 																   colname,
 																   label,
 																   used_names);