You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/12/07 12:28:08 UTC

incubator-hawq git commit: HAWQ-1564. Add Pluggable Storage Dependent Information

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 9578ab04c -> 816782bd8


HAWQ-1564. Add Pluggable Storage Dependent Information

The info added are mainly about external URI, block location, file splits and formatter action.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/816782bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/816782bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/816782bd

Branch: refs/heads/master
Commit: 816782bd84c5ac001a86afa907a81794530e3433
Parents: 9578ab0
Author: Chiyang Wan <ch...@gmail.com>
Authored: Tue Dec 5 09:57:02 2017 +0800
Committer: Chiyang Wan <ch...@gmail.com>
Committed: Wed Dec 6 12:35:34 2017 +0800

----------------------------------------------------------------------
 src/backend/access/external/fileam.c | 26 +++++++++++++------------
 src/backend/access/external/url.c    | 32 +++++++++++++++++++++++++------
 src/backend/nodes/copyfuncs.c        |  1 +
 src/backend/nodes/outfast.c          |  1 +
 src/backend/nodes/outfuncs.c         |  1 +
 src/backend/nodes/readfast.c         |  1 +
 src/include/access/extprotocol.h     | 22 +++++++++++++++++++--
 src/include/access/filesplit.h       |  1 +
 src/include/access/formatter.h       |  7 ++++++-
 src/include/access/relscan.h         | 15 +++++++++++++++
 src/include/access/url.h             |  2 +-
 11 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 099dae5..6a59b95 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
 #include "cdb/cdbutil.h"
 #include "cdb/cdbvars.h"
 
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc, ScanState *ss);
 static void InitParseState(CopyState pstate, Relation relation,
 						   Datum* values, bool* nulls, bool writable,
 						   List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData*	fcinfo,
 
 static void open_external_readable_source(FileScanDesc scan);
 static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int	external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc);
+static int	external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss);
 static void external_senddata(URL_FILE *extfile, CopyState pstate);
 static void external_scan_error_callback(void *arg);
 void readHeaderLine(CopyState pstate);
@@ -487,6 +487,7 @@ HeapTuple
 external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
 {
 	HeapTuple	tuple;
+	ScanState *ss = NULL; /* a temporary dummy for the following steps */
 
 	if (scan->fs_noop)
 		return NULL;
@@ -508,7 +509,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
 	/* Note: no locking manipulations needed */
 	FILEDEBUG_1;
 
-	tuple = externalgettup(scan, direction, desc);
+	tuple = externalgettup(scan, direction, desc, ss);
 
 
 	if (tuple == NULL)
@@ -991,7 +992,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
 }
 
 static HeapTuple
-externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
 		HeapTuple	tuple = NULL;
 		CopyState	pstate = scan->fs_pstate;
@@ -1003,7 +1004,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
 			/* need to fill our buffer with data? */
 			if (pstate->raw_buf_done)
 			{
-			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+			    pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
 				pstate->begloc = pstate->raw_buf;
 				pstate->raw_buf_done = (pstate->bytesread==0);
 				pstate->raw_buf_index = 0;
@@ -1094,7 +1095,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
 }
 
 static HeapTuple
-externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
 {
 		HeapTuple   	tuple;
 		CopyState		pstate = scan->fs_pstate;
@@ -1110,7 +1111,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
 			/* need to fill our buffer with data? */
 			if (pstate->raw_buf_done)
 			{
-				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+				int	 bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
 				if ( bytesread > 0 )
 					appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
 				pstate->raw_buf_done = false;
@@ -1252,7 +1253,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
 */
 static HeapTuple
 externalgettup(FileScanDesc scan,
-		    ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc)
+		    ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc, ScanState *ss)
 {
 
 	CopyState	pstate = scan->fs_pstate;
@@ -1274,9 +1275,9 @@ externalgettup(FileScanDesc scan,
 	}
 
 	if (!custom)
-		return externalgettup_defined(scan, desc); /* text/csv */
+		return externalgettup_defined(scan, desc, ss); /* text/csv */
 	else
-		return externalgettup_custom(scan, desc);  /* custom   */
+		return externalgettup_custom(scan, desc, ss);  /* custom   */
 
 }
 /*
@@ -1769,7 +1770,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname)
  * get a chunk of data from the external data file.
  */
 static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss)
 {
 	int			bytesread = 0;
 
@@ -1781,7 +1782,8 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelec
  	*/
 
 
-	bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc);
+	bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc,
+						  (ss == NULL ? NULL : &(ss->splits)));
 
 	if (url_feof(extfile, bytesread))
  	{

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c
index 7f586db..7aadc95 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -24,6 +24,7 @@
 #include <sys/wait.h>
 
 #include "access/fileam.h"
+#include "access/filesplit.h"
 #include "access/heapam.h"
 #include "access/valid.h"
 #include "catalog/pg_extprotocol.h"
@@ -88,7 +89,8 @@ static int32  InvokeExtProtocol(void		*ptr,
 								URL_FILE 	*file, 
 								CopyState 	pstate,
 								bool		last_call,
-								ExternalSelectDesc desc);
+								ExternalSelectDesc desc,
+								List		**psplits);
 void extract_http_domain(char* i_path, char* o_domain, int dlen);
 
 
@@ -1272,7 +1274,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname)
 			
 			/* last call. let the user close custom resources */
 			if(file->u.custom.protocol_udf)
-				(void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL);
+				(void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL, NULL);
 
 			/* now clean up everything not cleaned by user */
 			MemoryContextDelete(file->u.custom.protcxt);
@@ -1776,7 +1778,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat
 
 
 size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **splits)
 {
     size_t 	want;
 	int 	n;
@@ -1823,7 +1825,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate
 
 		case CFTYPE_CUSTOM:
 			
-			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc);
+			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc, splits);
 			break;
 				
 		default: /* unknown or supported type */
@@ -1861,7 +1863,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat
 		
 		case CFTYPE_CUSTOM:
 						
-			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL);
+			want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL, NULL);
 			break;
 			
 		default: /* unknown or unsupported type */
@@ -2299,7 +2301,8 @@ InvokeExtProtocol(void 	   		*ptr,
 				  URL_FILE 		*file, 
 				  CopyState 	 pstate,
 				  bool			 last_call,
-				  ExternalSelectDesc desc)
+				  ExternalSelectDesc desc,
+				  List		   **psplits)
 {
 	FunctionCallInfoData	fcinfo;
 	ExtProtocolData*		extprotocol = file->u.custom.extprotocol;
@@ -2318,7 +2321,24 @@ InvokeExtProtocol(void 	   		*ptr,
 	extprotocol->prot_scanquals = file->u.custom.scanquals;
 	extprotocol->prot_last_call = last_call;
 	extprotocol->desc = desc;
+	extprotocol->splits = NULL;
 	
+	if (psplits != NULL && *psplits != NULL) {
+		/*
+		 * We move to read splits from arg to this context structure, so that
+		 * means we passed split data only the first time this is called.
+		 */
+		while( list_length(*psplits)>0 )
+		{
+			FileSplit split = (FileSplit)lfirst(list_head(*psplits));
+			elog(LOG, "split %s:" INT64_FORMAT ", " INT64_FORMAT,
+					  split->ext_file_uri_string,
+					  split->offsets, split->lengths);
+			extprotocol->splits = lappend(extprotocol->splits, split);
+			*psplits = list_delete_first(*psplits);
+		}
+	}
+
 	InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo, 
 							 /* FmgrInfo */ extprotocol_udf, 
 							 /* nArgs */ 0, 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/copyfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index b16db04..b57c139 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4219,6 +4219,7 @@ _copyFileSplitNode(FileSplitNode *from)
   COPY_SCALAR_FIELD(logiceof);
   COPY_SCALAR_FIELD(offsets);
   COPY_SCALAR_FIELD(lengths);
+  COPY_STRING_FIELD(ext_file_uri_string);
 
   return newnode;
 }

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 348bb2c..749e1d3 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -2201,6 +2201,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
 	WRITE_INT64_FIELD(logiceof);
 	WRITE_INT64_FIELD(offsets);
 	WRITE_INT64_FIELD(lengths);
+	WRITE_STRING_FIELD(ext_file_uri_string);
 }
 
 static void

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 03a2550..cf6bf04 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -4053,6 +4053,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
 	WRITE_INT64_FIELD(logiceof);
 	WRITE_INT64_FIELD(offsets);
 	WRITE_INT64_FIELD(lengths);
+	WRITE_STRING_FIELD(ext_file_uri_string);
 }
 
 static void

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/readfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index a77a217..1be9620 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -2294,6 +2294,7 @@ _readFileSplitNode(const char **str)
 	READ_INT64_FIELD(logiceof);
 	READ_INT64_FIELD(offsets);
 	READ_INT64_FIELD(lengths);
+	READ_STRING_FIELD(ext_file_uri_string);
 	READ_DONE();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index c1aa724..674284b 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -50,7 +50,7 @@ typedef struct ExtProtocolData
 	bool			prot_last_call;
 	List		   *prot_scanquals;
 	ExternalSelectDesc desc;
-
+	List		   *splits;			/* splits to read from external protocol */
 } ExtProtocolData;
 
 typedef ExtProtocolData *ExtProtocol;
@@ -81,6 +81,17 @@ typedef enum ValidatorDirection
 } ValidatorDirection;
 
 /*
+ * Indicate the validator to validate arguments when creating external table or
+ * let validator fetch block location information. This design is to avoid
+ * changing catalog table.
+ */
+typedef enum ValidatorAction
+{
+	EXT_VALID_ACT_ARGUMENTS,
+	EXT_VALID_ACT_GETBLKLOC
+} ValidatorAction;
+
+/*
  * ExtProtocolValidatorData is the node type that is passed as fmgr "context" info
  * when a function is called by the External Table protocol manager.
  */
@@ -91,9 +102,16 @@ typedef struct ExtProtocolValidatorData
 	List 		  		*format_opts;
 	ValidatorDirection 	 direction;  /* validating read or write? */
 	char				*errmsg;		  /* the validation error upon return, if any */
-	
+	ValidatorAction		 action;	/* indicate what action should be done. */
+	bool				forceCreateDir;
 } ExtProtocolValidatorData;
 
+typedef struct ExtProtocolBlockLocationData
+{
+	NodeTag				 type;
+	List				*files;		/* List of blocklocation_file*/
+} ExtProtocolBlockLocationData;
+
 typedef ExtProtocolValidatorData *ExtProtocolValidator;
 
 #define CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo) \

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/filesplit.h
----------------------------------------------------------------------
diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h
index e627b07..2284d2a 100644
--- a/src/include/access/filesplit.h
+++ b/src/include/access/filesplit.h
@@ -40,6 +40,7 @@ typedef struct FileSplitNode
 	int64 logiceof;
 	int64 offsets;
 	int64 lengths;
+	char *ext_file_uri_string;
 } FileSplitNode;
 
 typedef FileSplitNode *FileSplit;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/formatter.h
----------------------------------------------------------------------
diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h
index 326002c..f87afc2 100644
--- a/src/include/access/formatter.h
+++ b/src/include/access/formatter.h
@@ -48,7 +48,7 @@ typedef enum FmtNotification
 typedef struct FormatterData
 {
 	NodeTag			type;                 /* see T_FormatterData */
-	
+
 	/* metadata */
 	Relation    	fmt_relation;
 	TupleDesc   	fmt_tupDesc;
@@ -74,6 +74,11 @@ typedef struct FormatterData
 	bool			fmt_needs_transcoding;
 	FmgrInfo*		fmt_conversion_proc;
 	int				fmt_external_encoding;
+
+	/* external url */
+	char		   *fmt_url;
+	/* splits */
+	List		   *fmt_splits;
 		
 } FormatterData;
 

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/relscan.h
----------------------------------------------------------------------
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index bff990a..5fc6c76 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,9 +18,14 @@
 #include "access/skey.h"
 #include "access/memtup.h"
 #include "access/aosegfiles.h"
+#include "access/plugstorage_utils.h"
+#include "nodes/plannodes.h"
 #include "storage/bufpage.h"
 #include "utils/tqual.h"
 
+/* forward declaration from nodes/execnodes.h */
+typedef struct ScanState ScanState;
+
 typedef struct HeapScanDescData
 {
 	/* scan parameters */
@@ -127,6 +132,16 @@ typedef struct FileScanDescData
 	/* custom data formatter */
 	FormatterData *fs_formatter;
 	
+	/* formatter type and name */
+	int fs_formatter_type;
+	char *fs_formatter_name;
+
+	/* current scan information for pluggable format */
+	PlugStorageScanFuncs fs_ps_scan_funcs;   /* scan functions */
+	void *fs_ps_user_data;                   /* user data */
+	ScanState *fs_ps_scan_state;             /* support rescan */
+	Plan *fs_ps_plan;                        /* support rescan */
+
 }	FileScanDescData;
 
 typedef FileScanDescData *FileScanDesc;

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index 307656a..b1ec102 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst
 extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
 extern bool url_feof(URL_FILE *file, int bytesread);
 extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **psplits);
 extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
 extern void url_rewind(URL_FILE *file, const char *relname);
 extern void url_fflush(URL_FILE *file, CopyState pstate);