You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/12/07 12:28:08 UTC
incubator-hawq git commit: HAWQ-1564. Add Pluggable Storage Dependent
Information
Repository: incubator-hawq
Updated Branches:
refs/heads/master 9578ab04c -> 816782bd8
HAWQ-1564. Add Pluggable Storage Dependent Information
The info added are mainly about external URI, block location, file splits and formatter action.
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/816782bd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/816782bd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/816782bd
Branch: refs/heads/master
Commit: 816782bd84c5ac001a86afa907a81794530e3433
Parents: 9578ab0
Author: Chiyang Wan <ch...@gmail.com>
Authored: Tue Dec 5 09:57:02 2017 +0800
Committer: Chiyang Wan <ch...@gmail.com>
Committed: Wed Dec 6 12:35:34 2017 +0800
----------------------------------------------------------------------
src/backend/access/external/fileam.c | 26 +++++++++++++------------
src/backend/access/external/url.c | 32 +++++++++++++++++++++++++------
src/backend/nodes/copyfuncs.c | 1 +
src/backend/nodes/outfast.c | 1 +
src/backend/nodes/outfuncs.c | 1 +
src/backend/nodes/readfast.c | 1 +
src/include/access/extprotocol.h | 22 +++++++++++++++++++--
src/include/access/filesplit.h | 1 +
src/include/access/formatter.h | 7 ++++++-
src/include/access/relscan.h | 15 +++++++++++++++
src/include/access/url.h | 2 +-
11 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 099dae5..6a59b95 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc, ScanState *ss);
static void InitParseState(CopyState pstate, Relation relation,
Datum* values, bool* nulls, bool writable,
List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
static void open_external_readable_source(FileScanDesc scan);
static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc);
+static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss);
static void external_senddata(URL_FILE *extfile, CopyState pstate);
static void external_scan_error_callback(void *arg);
void readHeaderLine(CopyState pstate);
@@ -487,6 +487,7 @@ HeapTuple
external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
{
HeapTuple tuple;
+ ScanState *ss = NULL; /* a temporary dummy for the following steps */
if (scan->fs_noop)
return NULL;
@@ -508,7 +509,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc
/* Note: no locking manipulations needed */
FILEDEBUG_1;
- tuple = externalgettup(scan, direction, desc);
+ tuple = externalgettup(scan, direction, desc, ss);
if (tuple == NULL)
@@ -991,7 +992,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
}
static HeapTuple
-externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
HeapTuple tuple = NULL;
CopyState pstate = scan->fs_pstate;
@@ -1003,7 +1004,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
/* need to fill our buffer with data? */
if (pstate->raw_buf_done)
{
- pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+ pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
pstate->begloc = pstate->raw_buf;
pstate->raw_buf_done = (pstate->bytesread==0);
pstate->raw_buf_index = 0;
@@ -1094,7 +1095,7 @@ externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
}
static HeapTuple
-externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc, ScanState *ss)
{
HeapTuple tuple;
CopyState pstate = scan->fs_pstate;
@@ -1110,7 +1111,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
/* need to fill our buffer with data? */
if (pstate->raw_buf_done)
{
- int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
+ int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc, ss);
if ( bytesread > 0 )
appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
pstate->raw_buf_done = false;
@@ -1252,7 +1253,7 @@ externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
*/
static HeapTuple
externalgettup(FileScanDesc scan,
- ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc)
+ ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc, ScanState *ss)
{
CopyState pstate = scan->fs_pstate;
@@ -1274,9 +1275,9 @@ externalgettup(FileScanDesc scan,
}
if (!custom)
- return externalgettup_defined(scan, desc); /* text/csv */
+ return externalgettup_defined(scan, desc, ss); /* text/csv */
else
- return externalgettup_custom(scan, desc); /* custom */
+ return externalgettup_custom(scan, desc, ss); /* custom */
}
/*
@@ -1769,7 +1770,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname)
* get a chunk of data from the external data file.
*/
static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc, ScanState *ss)
{
int bytesread = 0;
@@ -1781,7 +1782,8 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelec
*/
- bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc);
+ bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc,
+ (ss == NULL ? NULL : &(ss->splits)));
if (url_feof(extfile, bytesread))
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c
index 7f586db..7aadc95 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -24,6 +24,7 @@
#include <sys/wait.h>
#include "access/fileam.h"
+#include "access/filesplit.h"
#include "access/heapam.h"
#include "access/valid.h"
#include "catalog/pg_extprotocol.h"
@@ -88,7 +89,8 @@ static int32 InvokeExtProtocol(void *ptr,
URL_FILE *file,
CopyState pstate,
bool last_call,
- ExternalSelectDesc desc);
+ ExternalSelectDesc desc,
+ List **psplits);
void extract_http_domain(char* i_path, char* o_domain, int dlen);
@@ -1272,7 +1274,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname)
/* last call. let the user close custom resources */
if(file->u.custom.protocol_udf)
- (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL);
+ (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL, NULL);
/* now clean up everything not cleaned by user */
MemoryContextDelete(file->u.custom.protcxt);
@@ -1776,7 +1778,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat
size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **splits)
{
size_t want;
int n;
@@ -1823,7 +1825,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate
case CFTYPE_CUSTOM:
- want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc);
+ want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc, splits);
break;
default: /* unknown or supported type */
@@ -1861,7 +1863,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat
case CFTYPE_CUSTOM:
- want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL);
+ want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL, NULL);
break;
default: /* unknown or unsupported type */
@@ -2299,7 +2301,8 @@ InvokeExtProtocol(void *ptr,
URL_FILE *file,
CopyState pstate,
bool last_call,
- ExternalSelectDesc desc)
+ ExternalSelectDesc desc,
+ List **psplits)
{
FunctionCallInfoData fcinfo;
ExtProtocolData* extprotocol = file->u.custom.extprotocol;
@@ -2318,7 +2321,24 @@ InvokeExtProtocol(void *ptr,
extprotocol->prot_scanquals = file->u.custom.scanquals;
extprotocol->prot_last_call = last_call;
extprotocol->desc = desc;
+ extprotocol->splits = NULL;
+ if (psplits != NULL && *psplits != NULL) {
+ /*
+ * We move to read splits from arg to this context structure, so that
+ * means we passed split data only the first time this is called.
+ */
+ while( list_length(*psplits)>0 )
+ {
+ FileSplit split = (FileSplit)lfirst(list_head(*psplits));
+ elog(LOG, "split %s:" INT64_FORMAT ", " INT64_FORMAT,
+ split->ext_file_uri_string,
+ split->offsets, split->lengths);
+ extprotocol->splits = lappend(extprotocol->splits, split);
+ *psplits = list_delete_first(*psplits);
+ }
+ }
+
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
/* FmgrInfo */ extprotocol_udf,
/* nArgs */ 0,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/copyfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/copyfuncs.c b/src/backend/nodes/copyfuncs.c
index b16db04..b57c139 100644
--- a/src/backend/nodes/copyfuncs.c
+++ b/src/backend/nodes/copyfuncs.c
@@ -4219,6 +4219,7 @@ _copyFileSplitNode(FileSplitNode *from)
COPY_SCALAR_FIELD(logiceof);
COPY_SCALAR_FIELD(offsets);
COPY_SCALAR_FIELD(lengths);
+ COPY_STRING_FIELD(ext_file_uri_string);
return newnode;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfast.c b/src/backend/nodes/outfast.c
index 348bb2c..749e1d3 100644
--- a/src/backend/nodes/outfast.c
+++ b/src/backend/nodes/outfast.c
@@ -2201,6 +2201,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
WRITE_INT64_FIELD(logiceof);
WRITE_INT64_FIELD(offsets);
WRITE_INT64_FIELD(lengths);
+ WRITE_STRING_FIELD(ext_file_uri_string);
}
static void
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/outfuncs.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/outfuncs.c b/src/backend/nodes/outfuncs.c
index 03a2550..cf6bf04 100644
--- a/src/backend/nodes/outfuncs.c
+++ b/src/backend/nodes/outfuncs.c
@@ -4053,6 +4053,7 @@ _outFileSplitNode(StringInfo str, FileSplitNode *node)
WRITE_INT64_FIELD(logiceof);
WRITE_INT64_FIELD(offsets);
WRITE_INT64_FIELD(lengths);
+ WRITE_STRING_FIELD(ext_file_uri_string);
}
static void
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/backend/nodes/readfast.c
----------------------------------------------------------------------
diff --git a/src/backend/nodes/readfast.c b/src/backend/nodes/readfast.c
index a77a217..1be9620 100644
--- a/src/backend/nodes/readfast.c
+++ b/src/backend/nodes/readfast.c
@@ -2294,6 +2294,7 @@ _readFileSplitNode(const char **str)
READ_INT64_FIELD(logiceof);
READ_INT64_FIELD(offsets);
READ_INT64_FIELD(lengths);
+ READ_STRING_FIELD(ext_file_uri_string);
READ_DONE();
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index c1aa724..674284b 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -50,7 +50,7 @@ typedef struct ExtProtocolData
bool prot_last_call;
List *prot_scanquals;
ExternalSelectDesc desc;
-
+ List *splits; /* splits to read from external protocol */
} ExtProtocolData;
typedef ExtProtocolData *ExtProtocol;
@@ -81,6 +81,17 @@ typedef enum ValidatorDirection
} ValidatorDirection;
/*
+ * Indicate the validator to validate arguments when creating external table or
+ * let validator fetch block location information. This design is to avoid
+ * changing catalog table.
+ */
+typedef enum ValidatorAction
+{
+ EXT_VALID_ACT_ARGUMENTS,
+ EXT_VALID_ACT_GETBLKLOC
+} ValidatorAction;
+
+/*
* ExtProtocolValidatorData is the node type that is passed as fmgr "context" info
* when a function is called by the External Table protocol manager.
*/
@@ -91,9 +102,16 @@ typedef struct ExtProtocolValidatorData
List *format_opts;
ValidatorDirection direction; /* validating read or write? */
char *errmsg; /* the validation error upon return, if any */
-
+ ValidatorAction action; /* indicate what action should be done. */
+ bool forceCreateDir;
} ExtProtocolValidatorData;
+typedef struct ExtProtocolBlockLocationData
+{
+ NodeTag type;
+ List *files; /* List of blocklocation_file*/
+} ExtProtocolBlockLocationData;
+
typedef ExtProtocolValidatorData *ExtProtocolValidator;
#define CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo) \
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/filesplit.h
----------------------------------------------------------------------
diff --git a/src/include/access/filesplit.h b/src/include/access/filesplit.h
index e627b07..2284d2a 100644
--- a/src/include/access/filesplit.h
+++ b/src/include/access/filesplit.h
@@ -40,6 +40,7 @@ typedef struct FileSplitNode
int64 logiceof;
int64 offsets;
int64 lengths;
+ char *ext_file_uri_string;
} FileSplitNode;
typedef FileSplitNode *FileSplit;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/formatter.h
----------------------------------------------------------------------
diff --git a/src/include/access/formatter.h b/src/include/access/formatter.h
index 326002c..f87afc2 100644
--- a/src/include/access/formatter.h
+++ b/src/include/access/formatter.h
@@ -48,7 +48,7 @@ typedef enum FmtNotification
typedef struct FormatterData
{
NodeTag type; /* see T_FormatterData */
-
+
/* metadata */
Relation fmt_relation;
TupleDesc fmt_tupDesc;
@@ -74,6 +74,11 @@ typedef struct FormatterData
bool fmt_needs_transcoding;
FmgrInfo* fmt_conversion_proc;
int fmt_external_encoding;
+
+ /* external url */
+ char *fmt_url;
+ /* splits */
+ List *fmt_splits;
} FormatterData;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/relscan.h
----------------------------------------------------------------------
diff --git a/src/include/access/relscan.h b/src/include/access/relscan.h
index bff990a..5fc6c76 100644
--- a/src/include/access/relscan.h
+++ b/src/include/access/relscan.h
@@ -18,9 +18,14 @@
#include "access/skey.h"
#include "access/memtup.h"
#include "access/aosegfiles.h"
+#include "access/plugstorage_utils.h"
+#include "nodes/plannodes.h"
#include "storage/bufpage.h"
#include "utils/tqual.h"
+/* forward declaration from nodes/execnodes.h */
+typedef struct ScanState ScanState;
+
typedef struct HeapScanDescData
{
/* scan parameters */
@@ -127,6 +132,16 @@ typedef struct FileScanDescData
/* custom data formatter */
FormatterData *fs_formatter;
+ /* formatter type and name */
+ int fs_formatter_type;
+ char *fs_formatter_name;
+
+ /* current scan information for pluggable format */
+ PlugStorageScanFuncs fs_ps_scan_funcs; /* scan functions */
+ void *fs_ps_user_data; /* user data */
+ ScanState *fs_ps_scan_state; /* support rescan */
+ Plan *fs_ps_plan; /* support rescan */
+
} FileScanDescData;
typedef FileScanDescData *FileScanDesc;
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/816782bd/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index 307656a..b1ec102 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst
extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
extern bool url_feof(URL_FILE *file, int bytesread);
extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc, List **psplits);
extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
extern void url_rewind(URL_FILE *file, const char *relname);
extern void url_fflush(URL_FILE *file, CopyState pstate);