You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by sh...@apache.org on 2016/07/20 23:42:25 UTC
incubator-hawq git commit: HAWQ-927. Pass ProjectionInfo data to PXF
Repository: incubator-hawq
Updated Branches:
refs/heads/master 2f95286e0 -> 24d4d967e
HAWQ-927. Pass ProjectionInfo data to PXF
This commit makes the necessary modifications to the HAWQ side of
the codebase to add a list of indices of projected columns
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/24d4d967
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/24d4d967
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/24d4d967
Branch: refs/heads/master
Commit: 24d4d967e767fc640ad6ab3c91b2ce26bb96de66
Parents: 2f95286
Author: Kavinder Dhaliwal <ka...@gmail.com>
Authored: Mon Jul 11 18:20:16 2016 -0700
Committer: Kavinder Dhaliwal <ka...@gmail.com>
Committed: Wed Jul 20 16:36:08 2016 -0700
----------------------------------------------------------------------
src/backend/access/external/fileam.c | 37 +++++++++++++++++----------
src/backend/access/external/pxfheaders.c | 35 +++++++++++++++++++++++--
src/backend/access/external/url.c | 15 ++++++-----
src/backend/executor/nodeExternalscan.c | 6 +++--
src/bin/gpfusion/gpbridgeapi.c | 2 ++
src/include/access/extprotocol.h | 5 +++-
src/include/access/fileam.h | 14 +++++++++-
src/include/access/pxfheaders.h | 1 +
src/include/access/url.h | 2 +-
9 files changed, 91 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/fileam.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/fileam.c b/src/backend/access/external/fileam.c
index 645e6dc..70a115a 100644
--- a/src/backend/access/external/fileam.c
+++ b/src/backend/access/external/fileam.c
@@ -79,7 +79,7 @@
#include "cdb/cdbutil.h"
#include "cdb/cdbvars.h"
-static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir);
+static HeapTuple externalgettup(FileScanDesc scan, ScanDirection dir, ExternalSelectDesc desc);
static void InitParseState(CopyState pstate, Relation relation,
Datum* values, bool* nulls, bool writable,
List *fmtOpts, char fmtType,
@@ -97,7 +97,7 @@ static void FunctionCallPrepareFormatter(FunctionCallInfoData* fcinfo,
static void open_external_readable_source(FileScanDesc scan);
static void open_external_writable_source(ExternalInsertDesc extInsertDesc);
-static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread);
+static int external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc);
static void external_senddata(URL_FILE *extfile, CopyState pstate);
static void external_scan_error_callback(void *arg);
void readHeaderLine(CopyState pstate);
@@ -454,6 +454,17 @@ external_stopscan(FileScanDesc scan)
}
}
+/* ----------------
+ * external_getnext_init - prepare ExternalSelectDesc struct before external_getnext
+ * ----------------
+ */
+ExternalSelectDesc
+external_getnext_init(PlanState *state) {
+ ExternalSelectDesc desc = (ExternalSelectDesc) palloc0(sizeof(ExternalSelectDescData));
+ if (state != NULL)
+ desc->projInfo = state->ps_ProjInfo;
+ return desc;
+}
/* ----------------------------------------------------------------
* external_getnext
@@ -462,7 +473,7 @@ external_stopscan(FileScanDesc scan)
* ----------------------------------------------------------------
*/
HeapTuple
-external_getnext(FileScanDesc scan, ScanDirection direction)
+external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc)
{
HeapTuple tuple;
@@ -486,7 +497,7 @@ external_getnext(FileScanDesc scan, ScanDirection direction)
/* Note: no locking manipulations needed */
FILEDEBUG_1;
- tuple = externalgettup(scan, direction);
+ tuple = externalgettup(scan, direction, desc);
if (tuple == NULL)
@@ -969,7 +980,7 @@ static DataLineStatus parse_next_line(FileScanDesc scan)
}
static HeapTuple
-externalgettup_defined(FileScanDesc scan)
+externalgettup_defined(FileScanDesc scan, ExternalSelectDesc desc)
{
HeapTuple tuple = NULL;
CopyState pstate = scan->fs_pstate;
@@ -981,7 +992,7 @@ externalgettup_defined(FileScanDesc scan)
/* need to fill our buffer with data? */
if (pstate->raw_buf_done)
{
- pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+ pstate->bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
pstate->begloc = pstate->raw_buf;
pstate->raw_buf_done = (pstate->bytesread==0);
pstate->raw_buf_index = 0;
@@ -1072,7 +1083,7 @@ externalgettup_defined(FileScanDesc scan)
}
static HeapTuple
-externalgettup_custom(FileScanDesc scan)
+externalgettup_custom(FileScanDesc scan, ExternalSelectDesc desc)
{
HeapTuple tuple;
CopyState pstate = scan->fs_pstate;
@@ -1088,7 +1099,7 @@ externalgettup_custom(FileScanDesc scan)
/* need to fill our buffer with data? */
if (pstate->raw_buf_done)
{
- int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE);
+ int bytesread = external_getdata((URL_FILE*)scan->fs_file, pstate, RAW_BUF_SIZE, desc);
if ( bytesread > 0 )
appendBinaryStringInfo(&formatter->fmt_databuf, pstate->raw_buf, bytesread);
pstate->raw_buf_done = false;
@@ -1230,7 +1241,7 @@ externalgettup_custom(FileScanDesc scan)
*/
static HeapTuple
externalgettup(FileScanDesc scan,
- ScanDirection dir __attribute__((unused)))
+ ScanDirection dir __attribute__((unused)), ExternalSelectDesc desc)
{
CopyState pstate = scan->fs_pstate;
@@ -1252,9 +1263,9 @@ externalgettup(FileScanDesc scan,
}
if (!custom)
- return externalgettup_defined(scan); /* text/csv */
+ return externalgettup_defined(scan, desc); /* text/csv */
else
- return externalgettup_custom(scan); /* custom */
+ return externalgettup_custom(scan, desc); /* custom */
}
/*
@@ -1747,7 +1758,7 @@ close_external_source(FILE *dataSource, bool failOnError, const char *relname)
* get a chunk of data from the external data file.
*/
static int
-external_getdata(URL_FILE *extfile, CopyState pstate, int maxread)
+external_getdata(URL_FILE *extfile, CopyState pstate, int maxread, ExternalSelectDesc desc)
{
int bytesread = 0;
@@ -1759,7 +1770,7 @@ external_getdata(URL_FILE *extfile, CopyState pstate, int maxread)
*/
- bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate);
+ bytesread = url_fread((void *) pstate->raw_buf, 1, maxread, extfile, pstate, desc);
if (url_feof(extfile, bytesread))
{
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/pxfheaders.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/pxfheaders.c b/src/backend/access/external/pxfheaders.c
index 579e705..e653d30 100644
--- a/src/backend/access/external/pxfheaders.c
+++ b/src/backend/access/external/pxfheaders.c
@@ -34,6 +34,7 @@ static void add_location_options_httpheader(CHURL_HEADERS headers, GPHDUri *gphd
static char* prepend_x_gp(const char* key);
static void add_delegation_token_headers(CHURL_HEADERS headers, PxfInputData *inputData);
static void add_remote_credentials(CHURL_HEADERS headers);
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo);
/*
* Add key/value pairs to connection header.
@@ -47,6 +48,7 @@ void build_http_header(PxfInputData *input)
GPHDUri *gphduri = input->gphduri;
Relation rel = input->rel;
char *filterstr = input->filterstr;
+ ProjectionInfo *proj_info = input->proj_info;
if (rel != NULL)
{
@@ -60,6 +62,11 @@ void build_http_header(PxfInputData *input)
add_tuple_desc_httpheader(headers, rel);
}
+ if (proj_info != NULL)
+ {
+ add_projection_desc_httpheader(headers, proj_info);
+ }
+
/* GP cluster configuration */
external_set_env_vars(&ev, gphduri->uri, false, NULL, NULL, false, 0);
@@ -123,7 +130,8 @@ static void add_alignment_size_httpheader(CHURL_HEADERS headers)
*/
static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
{
- char long_number[32];
+ char long_number[sizeof(int32) * 8];
+
StringInfoData formatter;
TupleDesc tuple;
initStringInfo(&formatter);
@@ -133,7 +141,7 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
/* Convert the number of attributes to a string */
pg_ltoa(tuple->natts, long_number);
- churl_headers_append(headers, "X-GP-ATTRS", long_number);
+ churl_headers_append(headers, "X-GP-ATTRS", long_number);
/* Iterate attributes */
for (int i = 0; i < tuple->natts; ++i)
@@ -158,6 +166,29 @@ static void add_tuple_desc_httpheader(CHURL_HEADERS headers, Relation rel)
pfree(formatter.data);
}
+static void add_projection_desc_httpheader(CHURL_HEADERS headers, ProjectionInfo *projInfo) {
+ int i;
+ char long_number[sizeof(int32) * 8];
+ int *varNumbers = projInfo->pi_varNumbers;
+ StringInfoData formatter;
+ initStringInfo(&formatter);
+
+ /* Convert the number of projection columns to a string */
+ pg_ltoa(list_length(projInfo->pi_targetlist), long_number);
+ churl_headers_append(headers, "X-GP-ATTRS-PROJ", long_number);
+
+ for(i = 0; i < list_length(projInfo->pi_targetlist); i++) {
+ int number = varNumbers[i] - 1;
+ pg_ltoa(number, long_number);
+ resetStringInfo(&formatter);
+ appendStringInfo(&formatter, "X-GP-ATTRS-PROJ-IDX");
+
+ churl_headers_append(headers, formatter.data,long_number);
+ }
+
+ pfree(formatter.data);
+}
+
/*
* The options in the LOCATION statement of "create extenal table"
* FRAGMENTER=HdfsDataFragmenter&ACCESSOR=SequenceFileAccessor...
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/access/external/url.c
----------------------------------------------------------------------
diff --git a/src/backend/access/external/url.c b/src/backend/access/external/url.c
index 62a7ce0..97cdb92 100644
--- a/src/backend/access/external/url.c
+++ b/src/backend/access/external/url.c
@@ -86,7 +86,8 @@ static int32 InvokeExtProtocol(void *ptr,
size_t nbytes,
URL_FILE *file,
CopyState pstate,
- bool last_call);
+ bool last_call,
+ ExternalSelectDesc desc);
void extract_http_domain(char* i_path, char* o_domain, int dlen);
@@ -1270,7 +1271,7 @@ url_fclose(URL_FILE *file, bool failOnError, const char *relname)
/* last call. let the user close custom resources */
if(file->u.custom.protocol_udf)
- (void) InvokeExtProtocol(NULL, 0, file, NULL, true);
+ (void) InvokeExtProtocol(NULL, 0, file, NULL, true, NULL);
/* now clean up everything not cleaned by user */
MemoryContextDelete(file->u.custom.protcxt);
@@ -1774,7 +1775,7 @@ static size_t curl_fwrite(char *buf, int nbytes, URL_FILE* file, CopyState pstat
size_t
-url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate)
+url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc)
{
size_t want;
int n;
@@ -1821,7 +1822,7 @@ url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate
case CFTYPE_CUSTOM:
- want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false);
+ want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, desc);
break;
default: /* unknown or supported type */
@@ -1859,7 +1860,7 @@ url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstat
case CFTYPE_CUSTOM:
- want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false);
+ want = (size_t) InvokeExtProtocol(ptr, nmemb * size, file, pstate, false, NULL);
break;
default: /* unknown or unsupported type */
@@ -2296,7 +2297,8 @@ InvokeExtProtocol(void *ptr,
size_t nbytes,
URL_FILE *file,
CopyState pstate,
- bool last_call)
+ bool last_call,
+ ExternalSelectDesc desc)
{
FunctionCallInfoData fcinfo;
ExtProtocolData* extprotocol = file->u.custom.extprotocol;
@@ -2314,6 +2316,7 @@ InvokeExtProtocol(void *ptr,
extprotocol->prot_maxbytes = nbytes;
extprotocol->prot_scanquals = file->u.custom.scanquals;
extprotocol->prot_last_call = last_call;
+ extprotocol->desc = desc;
InitFunctionCallInfoData(/* FunctionCallInfoData */ fcinfo,
/* FmgrInfo */ extprotocol_udf,
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/backend/executor/nodeExternalscan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeExternalscan.c b/src/backend/executor/nodeExternalscan.c
index b19b25e..2831faa 100644
--- a/src/backend/executor/nodeExternalscan.c
+++ b/src/backend/executor/nodeExternalscan.c
@@ -66,6 +66,7 @@ ExternalNext(ExternalScanState *node)
EState *estate;
ScanDirection direction;
TupleTableSlot *slot;
+ ExternalSelectDesc externalSelectDesc;
/*
* get information from the estate and scan state
@@ -79,7 +80,8 @@ ExternalNext(ExternalScanState *node)
/*
* get the next tuple from the file access methods
*/
- tuple = external_getnext(scandesc, direction);
+ externalSelectDesc = external_getnext_init(&(node->ss.ps));
+ tuple = external_getnext(scandesc, direction, externalSelectDesc);
/*
* save the tuple and the buffer returned to us by the access methods in
@@ -113,7 +115,7 @@ ExternalNext(ExternalScanState *node)
ExecEagerFreeExternalScan(node);
}
}
-
+ pfree(externalSelectDesc);
return slot;
}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/bin/gpfusion/gpbridgeapi.c
----------------------------------------------------------------------
diff --git a/src/bin/gpfusion/gpbridgeapi.c b/src/bin/gpfusion/gpbridgeapi.c
index 011b0fc..465c38d 100644
--- a/src/bin/gpfusion/gpbridgeapi.c
+++ b/src/bin/gpfusion/gpbridgeapi.c
@@ -68,6 +68,7 @@ void free_token_resources(PxfInputData *inputData);
Datum gpbridge_import(PG_FUNCTION_ARGS)
{
gpbridge_check_inside_extproto(fcinfo, "gpbridge_import");
+// ExternalSelectDesc desc = EXTPROTOCOL_GET_SELECTDESC(fcinfo);
if (gpbridge_last_call(fcinfo))
PG_RETURN_INT32(gpbridge_cleanup(fcinfo));
@@ -181,6 +182,7 @@ void add_querydata_to_http_header(gphadoop_context* context, PG_FUNCTION_ARGS)
inputData.gphduri = context->gphd_uri;
inputData.rel = EXTPROTOCOL_GET_RELATION(fcinfo);
inputData.filterstr = serializePxfFilterQuals(EXTPROTOCOL_GET_SCANQUALS(fcinfo));
+ inputData.proj_info = EXTPROTOCOL_GET_PROJINFO(fcinfo);
add_delegation_token(&inputData);
build_http_header(&inputData);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/extprotocol.h
----------------------------------------------------------------------
diff --git a/src/include/access/extprotocol.h b/src/include/access/extprotocol.h
index d70c844..ca7d492 100644
--- a/src/include/access/extprotocol.h
+++ b/src/include/access/extprotocol.h
@@ -33,6 +33,8 @@
/* ------------------------- I/O function API -----------------------------*/
+struct ExternalSelectDescData;
+typedef struct ExternalSelectDescData *ExternalSelectDesc;
/*
* ExtProtocolData is the node type that is passed as fmgr "context" info
* when a function is called by the External Table protocol manager.
@@ -47,6 +49,7 @@ typedef struct ExtProtocolData
void *prot_user_ctx;
bool prot_last_call;
List *prot_scanquals;
+ ExternalSelectDesc desc;
} ExtProtocolData;
@@ -61,13 +64,13 @@ typedef ExtProtocolData *ExtProtocol;
#define EXTPROTOCOL_GET_DATALEN(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_maxbytes)
#define EXTPROTOCOL_GET_SCANQUALS(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_scanquals)
#define EXTPROTOCOL_GET_USER_CTX(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_user_ctx)
+#define EXTPROTOCOL_GET_PROJINFO(fcinfo) (((ExtProtocolData*) fcinfo->context)->desc->projInfo)
#define EXTPROTOCOL_IS_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call)
#define EXTPROTOCOL_SET_LAST_CALL(fcinfo) (((ExtProtocolData*) fcinfo->context)->prot_last_call = true)
#define EXTPROTOCOL_SET_USER_CTX(fcinfo, p) \
(((ExtProtocolData*) fcinfo->context)->prot_user_ctx = p)
-
/* ------------------------- Validator function API -----------------------------*/
typedef enum ValidatorDirection
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/fileam.h
----------------------------------------------------------------------
diff --git a/src/include/access/fileam.h b/src/include/access/fileam.h
index 145cb84..5a5f532 100644
--- a/src/include/access/fileam.h
+++ b/src/include/access/fileam.h
@@ -63,6 +63,17 @@ typedef struct ExternalInsertDescData
typedef ExternalInsertDescData *ExternalInsertDesc;
+/*
+ * ExternalSelectDescData is used for storing state related
+ * to selecting data from an external table.
+ */
+typedef struct ExternalSelectDescData
+{
+ ProjectionInfo *projInfo;
+} ExternalSelectDescData;
+
+typedef ExternalSelectDescData *ExternalSelectDesc;
+
typedef enum DataLineStatus
{
LINE_OK,
@@ -80,7 +91,8 @@ extern FileScanDesc external_beginscan(Relation relation, Index scanrelid,
extern void external_rescan(FileScanDesc scan);
extern void external_endscan(FileScanDesc scan);
extern void external_stopscan(FileScanDesc scan);
-extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction);
+extern ExternalSelectDesc external_getnext_init(PlanState *state);
+extern HeapTuple external_getnext(FileScanDesc scan, ScanDirection direction, ExternalSelectDesc desc);
extern ExternalInsertDesc external_insert_init(Relation rel, int errAosegno);
extern Oid external_insert(ExternalInsertDesc extInsertDesc, HeapTuple instup);
extern void external_insert_finish(ExternalInsertDesc extInsertDesc);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/pxfheaders.h
----------------------------------------------------------------------
diff --git a/src/include/access/pxfheaders.h b/src/include/access/pxfheaders.h
index 13853ab..da3da7f 100644
--- a/src/include/access/pxfheaders.h
+++ b/src/include/access/pxfheaders.h
@@ -44,6 +44,7 @@ typedef struct sPxfInputData
Relation rel;
char *filterstr;
PxfHdfsToken token;
+ ProjectionInfo *proj_info;
} PxfInputData;
void build_http_header(PxfInputData *input);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/24d4d967/src/include/access/url.h
----------------------------------------------------------------------
diff --git a/src/include/access/url.h b/src/include/access/url.h
index d837b68..9de492d 100644
--- a/src/include/access/url.h
+++ b/src/include/access/url.h
@@ -159,7 +159,7 @@ extern URL_FILE *url_fopen(char *url, bool forwrite, extvar_t *ev, CopyState pst
extern int url_fclose(URL_FILE *file, bool failOnError, const char *relname);
extern bool url_feof(URL_FILE *file, int bytesread);
extern bool url_ferror(URL_FILE *file, int bytesread, char *ebuf, int ebuflen);
-extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
+extern size_t url_fread(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate, ExternalSelectDesc desc);
extern size_t url_fwrite(void *ptr, size_t size, size_t nmemb, URL_FILE *file, CopyState pstate);
extern void url_rewind(URL_FILE *file, const char *relname);
extern void url_fflush(URL_FILE *file, CopyState pstate);