You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2016/07/20 23:42:25 UTC

incubator-hawq git commit: HAWQ-927. Pass ProjectionInfo data to PXF

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 2f95286e0 -> 24d4d967e


HAWQ-927. Pass ProjectionInfo data to PXF

This commit makes the necessary modifications to the HAWQ side of
the codebase to add a list of indices of projected columns


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/24d4d967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/24d4d967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/24d4d967

Branch: refs/heads/master
Commit: 24d4d967e767fc640ad6ab3c91b2ce26bb96de66
Parents: 2f95286
Author: Kavinder Dhaliwal <ka...@gmail.com>
Authored: Mon Jul 11 18:20:16 2016 -0700
Committer: Kavinder Dhaliwal <ka...@gmail.com>
Committed: Wed Jul 20 16:36:08 2016 -0700

----------------------------------------------------------------------
 src/backend/access/external/fileam.c     | 37 +++++++++++++++++----------
 src/backend/access/external/pxfheaders.c | 35 +++++++++++++++++++++++--
 src/backend/access/external/url.c        | 15 ++++++-----
 src/backend/executor/nodeExternalscan.c  |  6 +++--
 src/bin/gpfusion/gpbridgeapi.c           |  2 ++
 src/include/access/extprotocol.h         |  5 +++-
 src/include/access/fileam.h              | 14 +++++++++-
 src/include/access/pxfheaders.h          |  1 +
 src/include/access/url.h                 |  2 +-
 9 files changed, 91 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 645e6dc..70a115a 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
 
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc);
 static void InitParseState(CopyState pstate, Relation relation,
 						   Datum* values, bool* nulls, bool writable,
 						   List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int	external_getdata(URL_FILE *extfile, CopyState pstate, int maxread);
+static int	external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc);
 static void external_senddata(URL_FILE *extfile, CopyState pstate);
 static void external_scan_error_callback(void *arg);
 void readHeaderLine(CopyState pstate);
@@ -454,6 +454,17 @@ external_stopscan(FileScanDesc scan)
 	}
 }
 
+/*	----------------
+ *		external_getnext_init - prepare ExternalSelectDesc struct before external_getnext
+ *	----------------
+ */
+ExternalSelectDesc
+external_getnext_init(PlanState *state) {
+	ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
+	if (state != NULL)
+		desc->projInfo = state->ps_ProjInfo;
+	return desc;
+}
 
 /* ----------------------------------------------------------------
 *		external_getnext
@@ -462,7 +473,7 @@ external_stopscan(FileScanDesc scan)
 * ----------------------------------------------------------------
 */
 HeapTuple
-external_getnext(FileScanDesc scan, ScanDirection direction)
+external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
 {
 	HeapTuple	tuple;
 
@@ -486,7 +497,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction)
 	/* Note: no locking manipulations needed */
 	FILEDEBUG_1;
 
-	tuple = externalgettup(scan, direction);
+	tuple = externalgettup(scan, direction, desc);
 
 
 	if (tuple == NULL)
@@ -969,7 +980,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_defined(FileScanDesc scan)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
 {
 		HeapTuple	tuple = NULL;
 		CopyState	pstate = scan->fs_pstate;
@@ -981,7 +992,7 @@ externalgettup_defined(FileScanDesc scan)
 			/* need to fill our buffer with data? */
 			if (pstate->raw_buf_done)
 			{
-			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
 				pstate->begloc = pstate->raw_buf;
 				pstate->raw_buf_done = (pstate->bytesread==0);
 				pstate->raw_buf_index = 0;
@@ -1072,7 +1083,7 @@ externalgettup_defined(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_custom(FileScanDesc scan)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
 {
 		HeapTuple   	tuple;
 		CopyState		pstate = scan->fs_pstate;
@@ -1088,7 +1099,7 @@ externalgettup_custom(FileScanDesc scan)
 			/* need to fill our buffer with data? */
 			if (pstate->raw_buf_done)
 			{
-				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
 				if ( bytesread > 0 )
 					appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
 				pstate->raw_buf_done = false;
@@ -1230,7 +1241,7 @@ externalgettup_custom(FileScanDesc scan)
 */
 static HeapTuple
 externalgettup(FileScanDesc scan,
-		    ScanDirection dir __attribute__((unused)))
+		    ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc)
 {
 
 	CopyState	pstate = scan->fs_pstate;
@@ -1252,9 +1263,9 @@ externalgettup(FileScanDesc scan,
 	}
 
 	if (!custom)
-		return externalgettup_defined(scan); /* text/csv */
+		return externalgettup_defined(scan, desc); /* text/csv */
 	else
-		return externalgettup_custom(scan);  /* custom   */
+		return externalgettup_custom(scan, desc);  /* custom   */
 
 }
 /*
@@ -1747,7 +1758,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname)
  * get a chunk of data from the external data file.
  */
 static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc)
 {
 	int			bytesread = 0;
 
@@ -1759,7 +1770,7 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread)
  	*/
 
 
-	bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate);
+	bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc);
 
 	if (url_feof(extfile, bytesread))
  	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c
index 579e705..e653d30 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -34,6 +34,7 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphd
 static char* prepend_x_gp(const char* key);
 static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData);
 static void add_remote_credentials(CHURL_HEADERS headers);
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo);
 
 /* 
  * Add key/value pairs to connection header. 
@@ -47,6 +48,7 @@ void build_http_header(PxfInputData *input)
 	GPHDUri *gphduri = input->gphduri;
 	Relation rel = input->rel;
 	char *filterstr = input->filterstr;
+	ProjectionInfo *proj_info = input->proj_info;
 	
 	if (rel != NULL)
 	{
@@ -60,6 +62,11 @@ void build_http_header(PxfInputData *input)
 		add_tuple_desc_httpheader(headers, rel);
 	}
 	
+	if (proj_info != NULL)
+	{
+		add_projection_desc_httpheader(headers, proj_info);
+	}
+
 	/* GP cluster configuration */
 	external_set_env_vars(&ev, gphduri->uri, false, NULL, NULL, false, 0);
 	
@@ -123,7 +130,8 @@ static void add_alignment_size_httpheader(CHURL_HEADERS headers)
  */
 static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
 {	
-    char long_number[32];	
+    char long_number[sizeof(int32) * 8];
+
     StringInfoData formatter;	
     TupleDesc tuple;		
     initStringInfo(&formatter);
@@ -133,7 +141,7 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
 	
     /* Convert the number of attributes to a string */	
     pg_ltoa(tuple->natts, long_number);	
-    churl_headers_append(headers, "X-GP-ATTRS", long_number);	
+    churl_headers_append(headers, "X-GP-ATTRS", long_number);
 	
     /* Iterate attributes */	
     for (int i = 0; i < tuple->natts; ++i)		
@@ -158,6 +166,29 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
 	pfree(formatter.data);
 }
 
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo) {
+    int i;
+    char long_number[sizeof(int32) * 8];
+    int *varNumbers = projInfo->pi_varNumbers;
+    StringInfoData formatter;
+    initStringInfo(&formatter);
+
+    /* Convert the number of projection columns to a string */
+    pg_ltoa(list_length(projInfo->pi_targetlist), long_number);
+    churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number);
+
+    for(i = 0; i < list_length(projInfo->pi_targetlist); i++) {
+        int number = varNumbers[i] - 1;
+        pg_ltoa(number, long_number);
+        resetStringInfo(&formatter);
+        appendStringInfo(&formatter, "X-GP-ATTRS-PROJ-IDX");
+
+        churl_headers_append(headers, formatter.data,long_number);
+    }
+
+    pfree(formatter.data);
+}
+
 /* 
  * The options in the LOCATION statement of "create extenal table"
  * FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SequenceFileAccessor... 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c
index 62a7ce0..97cdb92 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -86,7 +86,8 @@ static int32  InvokeExtProtocol(void		*ptr,
 								size_t 		nbytes, 
 								URL_FILE 	*file, 
 								CopyState 	pstate,
-								bool		last_call);
+								bool		last_call,
+								ExternalSelectDesc desc);
 void extract_http_domain(char* i_path, char* o_domain, int dlen);
 
 
@@ -1270,7 +1271,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname)
 			
 			/* last call. let the user close custom resources */
 			if(file->u.custom.protocol_udf)
-				(void) InvokeExtProtocol(NULL, 0, file, NULL, true);
+				(void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL);
 
 			/* now clean up everything not cleaned by user */
 			MemoryContextDelete(file->u.custom.protcxt);
@@ -1774,7 +1775,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat
 
 
 size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc)
 {
     size_t 	want;
 	int 	n;
@@ -1821,7 +1822,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate
 
 		case CFTYPE_CUSTOM:
 			
-			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false);
+			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc);
 			break;
 				
 		default: /* unknown or supported type */
@@ -1859,7 +1860,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat
 		
 		case CFTYPE_CUSTOM:
 						
-			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false);
+			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL);
 			break;
 			
 		default: /* unknown or unsupported type */
@@ -2296,7 +2297,8 @@ InvokeExtProtocol(void 	   		*ptr,
 				  size_t 		 nbytes, 
 				  URL_FILE 		*file, 
 				  CopyState 	 pstate,
-				  bool			 last_call)
+				  bool			 last_call,
+				  ExternalSelectDesc desc)
 {
 	FunctionCallInfoData	fcinfo;
 	ExtProtocolData*		extprotocol = file->u.custom.extprotocol;
@@ -2314,6 +2316,7 @@ InvokeExtProtocol(void 	   		*ptr,
 	extprotocol->prot_maxbytes = nbytes;
 	extprotocol->prot_scanquals = file->u.custom.scanquals;
 	extprotocol->prot_last_call = last_call;
+	extprotocol->desc = desc;
 	
 	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, 
 							 /* FmgrInfo */ extprotocol_udf, 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index b19b25e..2831faa 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -66,6 +66,7 @@ ExternalNext(ExternalScanState *node)
 	EState	   *estate;
 	ScanDirection direction;
 	TupleTableSlot *slot;
+	ExternalSelectDesc externalSelectDesc;
 
 	/*
 	 * get information from the estate and scan state
@@ -79,7 +80,8 @@ ExternalNext(ExternalScanState *node)
 	/*
 	 * get the next tuple from the file access methods
 	 */
-	tuple = external_getnext(scandesc, direction);
+	externalSelectDesc = external_getnext_init(&(node->ss.ps));
+	tuple = external_getnext(scandesc, direction, externalSelectDesc);
 
 	/*
 	 * save the tuple and the buffer returned to us by the access methods in
@@ -113,7 +115,7 @@ ExternalNext(ExternalScanState *node)
 			ExecEagerFreeExternalScan(node);
 		}
 	}
-
+	pfree(externalSelectDesc);
 
 	return slot;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 011b0fc..465c38d 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -68,6 +68,7 @@ void	free_token_resources(PxfInputData *inputData);
 Datum gpbridge_import(PG_FUNCTION_ARGS)
 {
 	gpbridge_check_inside_extproto(fcinfo, "gpbridge_import");
+//	ExternalSelectDesc desc = EXTPROTOCOL_GET_SELECTDESC(fcinfo);
 
 	if (gpbridge_last_call(fcinfo))
 		PG_RETURN_INT32(gpbridge_cleanup(fcinfo));
@@ -181,6 +182,7 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
 	inputData.gphduri = context->gphd_uri;
 	inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
 	inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
+	inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
 	add_delegation_token(&inputData);
 	
 	build_http_header(&inputData);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index d70c844..ca7d492 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -33,6 +33,8 @@
 
 /* ------------------------- I/O function API -----------------------------*/
 
+struct ExternalSelectDescData;
+typedef struct ExternalSelectDescData *ExternalSelectDesc;
 /*
  * ExtProtocolData is the node type that is passed as fmgr "context" info
  * when a function is called by the External Table protocol manager.
@@ -47,6 +49,7 @@ typedef struct ExtProtocolData
 	void		   *prot_user_ctx;
 	bool			prot_last_call;
 	List		   *prot_scanquals;
+	ExternalSelectDesc desc;
 
 } ExtProtocolData;
 
@@ -61,13 +64,13 @@ typedef ExtProtocolData *ExtProtocol;
 #define EXTPROTOCOL_GET_DATALEN(fcinfo)    (((ExtProtocolData*) fcinfo->context)->prot_maxbytes)
 #define EXTPROTOCOL_GET_SCANQUALS(fcinfo)    (((ExtProtocolData*) fcinfo->context)->prot_scanquals)
 #define EXTPROTOCOL_GET_USER_CTX(fcinfo)   (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
+#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
 #define EXTPROTOCOL_IS_LAST_CALL(fcinfo)   (((ExtProtocolData*) fcinfo->context)->prot_last_call)
 
 #define EXTPROTOCOL_SET_LAST_CALL(fcinfo)  (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)
 #define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
 	(((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)
 
-
 /* ------------------------- Validator function API -----------------------------*/
 
 typedef enum ValidatorDirection

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 145cb84..5a5f532 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -63,6 +63,17 @@ typedef struct ExternalInsertDescData
 
 typedef ExternalInsertDescData *ExternalInsertDesc;
 
+/*
+ * ExternalSelectDescData is used for storing state related
+ * to selecting data from an external table.
+ */
+typedef struct ExternalSelectDescData
+{
+	ProjectionInfo *projInfo;
+} ExternalSelectDescData;
+
+typedef ExternalSelectDescData *ExternalSelectDesc;
+
 typedef enum DataLineStatus
 {
 	LINE_OK,
@@ -80,7 +91,8 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
 extern void external_rescan(FileScanDesc scan);
 extern void external_endscan(FileScanDesc scan);
 extern void external_stopscan(FileScanDesc scan);
-extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction);
+extern ExternalSelectDesc external_getnext_init(PlanState *state);
+extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
 extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
 extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
 extern void external_insert_finish(ExternalInsertDesc extInsertDesc);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/pxfheaders.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h
index 13853ab..da3da7f 100644
--- a/src/include/access/pxfheaders.h
+++ b/src/include/access/pxfheaders.h
@@ -44,6 +44,7 @@ typedef struct sPxfInputData
 	Relation		rel;
 	char			*filterstr;
 	PxfHdfsToken	token;
+	ProjectionInfo  *proj_info;
 } PxfInputData;
 
 void build_http_header(PxfInputData *input);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index d837b68..9de492d 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst
 extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
 extern bool url_feof(URL_FILE *file, int bytesread);
 extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc);
 extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
 extern void url_rewind(URL_FILE *file, const char *relname);
 extern void url_fflush(URL_FILE *file, CopyState pstate);