You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2019/08/02 07:20:30 UTC
[hawq] 01/02: HAWQ-1734. Resolve insert issue in external table of
orc
This is an automated email from the ASF dual-hosted git repository.
huor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hawq.git
commit 33ddcf7aa87eaa47063496319681627158d76de9
Author: tuyu <tu...@oushu.io>
AuthorDate: Thu Aug 1 15:29:47 2019 +0800
HAWQ-1734. Resolve insert issue in external table of orc
---
src/backend/access/appendonly/appendonlywriter.c | 81 ++++++++++++++++++++++++
src/backend/cdb/cdbdatalocality.c | 33 ++++++++--
src/backend/cdb/cdbquerycontextdispatching.c | 26 ++++++--
src/backend/commands/copy.c | 4 ++
src/backend/executor/execMain.c | 68 ++++++++++++++++----
src/include/access/appendonlywriter.h | 1 +
6 files changed, 189 insertions(+), 24 deletions(-)
diff --git a/src/backend/access/appendonly/appendonlywriter.c b/src/backend/access/appendonly/appendonlywriter.c
index addd33e..629ba3a 100644
--- a/src/backend/access/appendonly/appendonlywriter.c
+++ b/src/backend/access/appendonly/appendonlywriter.c
@@ -1185,6 +1185,72 @@ List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num,
}
+List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num,
+ bool forNewRel, bool reuse_segfilenum_in_same_xid,
+ bool keepHash)
+{
+ /* these vars are used in GP_ROLE_DISPATCH only */
+ AORelHashEntryData *aoentry = NULL;
+ TransactionId CurrentXid = GetTopTransactionId();
+ int next;
+ AOSegfileStatus *segfilestatus = NULL;
+ int remaining_num = segment_num;
+ bool has_same_txn_status = false;
+ AOSegfileStatus **maxSegno4Segment = NULL;
+
+ switch(Gp_role)
+ {
+ case GP_ROLE_EXECUTE:
+
+ Assert(existing_segnos != NIL);
+ Assert(list_length(existing_segnos) == segment_num);
+ return existing_segnos;
+
+ case GP_ROLE_UTILITY:
+
+ Assert(existing_segnos == NIL);
+ Assert(segment_num == 1);
+ return list_make1_int(RESERVED_SEGNO);
+
+ case GP_ROLE_DISPATCH:
+
+ Assert(existing_segnos == NIL);
+ Assert(segment_num > 0);
+
+ if (forNewRel)
+ {
+ int i;
+ for (i = 1; i <= segment_num; i++)
+ {
+ existing_segnos = lappend_int(existing_segnos, i);
+ }
+ return existing_segnos;
+ }
+
+ if (Debug_appendonly_print_segfile_choice)
+ {
+ ereport(LOG, (errmsg("SetSegnoForWrite: Choosing a segno for external "
+ "relation \"%s\" (%d) ",
+ get_rel_name(relid), relid)));
+ }
+
+ for (int i = 0; i < segment_num; i++)
+ {
+ existing_segnos = lappend_int(existing_segnos, i);
+ }
+ Assert(list_length(existing_segnos) == segment_num);
+
+ return existing_segnos;
+
+ /* fix this for dispatch agent. for now it's broken anyway. */
+ default:
+ Assert(false);
+ return NIL;
+ }
+
+ }
+
+
/*
* assignPerRelSegno
*
@@ -1231,6 +1297,21 @@ List *assignPerRelSegno(List *all_relids, int segment_num)
}
}
+ else if (RelationIsExternal(rel))
+ {
+ SegfileMapNode *n;
+
+ n = makeNode(SegfileMapNode);
+ n->relid = cur_relid;
+
+ n->segnos = SetSegnoForExternalWrite(NIL, cur_relid, segment_num,
+ false, true, true);
+
+ Assert(n->relid != InvalidOid);
+ Assert(n->segnos != NIL);
+
+ mapping = lappend(mapping, n);
+ }
/*
* hold RowExclusiveLock until the end of transaction
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 55f4ac0..67fe51d 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -4125,15 +4125,34 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query
int fileCountInRelation = list_length(rel_data->files);
bool FileCountBucketNumMismatch = false;
if (targetPolicy->bucketnum > 0) {
- FileCountBucketNumMismatch = fileCountInRelation %
- targetPolicy->bucketnum == 0 ? false : true;
+ Relation rel = heap_open(rel_data->relid, NoLock);
+ targetPolicy->bucketnum == 0 ? false : true;
+ if (!RelationIsExternal(rel))
+ {
+ FileCountBucketNumMismatch = fileCountInRelation %
+ targetPolicy->bucketnum == 0 ? false : true;
+ }
+ else
+ {
+ ListCell *lc_file;
+ int maxsegno = 0;
+ foreach(lc_file, rel_data->files)
+ {
+ Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
+ if (rel_file->segno > maxsegno)
+ maxsegno = rel_file->segno;
+ }
+ FileCountBucketNumMismatch =
+ maxsegno > targetPolicy->bucketnum ? true : false;
+ }
+ heap_close(rel, NoLock);
}
if (isRelationHash && FileCountBucketNumMismatch && !allow_file_count_bucket_num_mismatch) {
- elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
- "number %d of hash table with oid=%u, some data may be lost, if you "
- "still want to continue the query by considering the table as random, set GUC "
- "allow_file_count_bucket_num_mismatch to on and try again.",
- fileCountInRelation, targetPolicy->bucketnum, myrelid);
+ elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
+ "number %d of hash table with oid=%u, some data may be lost, if you "
+ "still want to continue the query by considering the table as random, set GUC "
+ "allow_file_count_bucket_num_mismatch to on and try again.",
+ fileCountInRelation, targetPolicy->bucketnum, myrelid);
}
/* change the virtual segment order when keep hash.
* order of idMap should also be changed.
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index 88d4f44..167af2a 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -2955,10 +2955,6 @@ fetchSegFileInfos(Oid relid, List *segnos)
}
storageChar = get_relation_storage_type(relid);
- /*
- * Get pg_appendonly information for this table.
- */
- aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
/*
* Based on the pg_appendonly information, fetch
@@ -2967,13 +2963,31 @@ fetchSegFileInfos(Oid relid, List *segnos)
*/
if (RELSTORAGE_AOROWS == storageChar)
{
+ /*
+ * Get pg_appendonly information for this table.
+ */
+ aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
AOFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
- else
+ else if (RELSTORAGE_PARQUET == storageChar)
{
- Assert(RELSTORAGE_PARQUET == storageChar);
+ /*
+ * Get pg_appendonly information for this table.
+ */
+ aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow);
}
+ else
+ {
+ /*
+ * Get range info for current magma hash table, which is already
+ * available when get block location and do datalocality
+ */
+ /*
+ * TODO(Zongtian): vsegno -> rg list -> {range list1, ..., range list N}
+ */
+ Assert(RELSTORAGE_EXTERNAL == storageChar);
+ }
}
return result;
}
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9c21148..dbeda09 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4331,6 +4331,9 @@ CopyFrom(CopyState cstate)
resultRelInfo->ri_aosegfileinfos,
GetQEIndex());
+ PlannedStmt* plannedstmt = palloc(sizeof(PlannedStmt));
+ memset(plannedstmt,0,sizeof(PlannedStmt));
+ plannedstmt->scantable_splits = cstate->splits;
resultRelInfo->ri_extInsertDesc =
InvokePlugStorageFormatInsertInit(insertInitFunc,
resultRelInfo->ri_RelationDesc,
@@ -4340,6 +4343,7 @@ CopyFrom(CopyState cstate)
segfileinfo->segno);
pfree(insertInitFunc);
+ pfree(plannedstmt);
}
else
{
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 81f68f2..e21aeb6 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -875,8 +875,9 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
result_segfileinfos = GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc),
estate->es_result_aosegnos, result_segfileinfos);
}
+
plannedstmt->result_segfileinfos = result_segfileinfos;
-
+
if (plannedstmt->intoClause != NULL)
{
List *segment_segnos = SetSegnoForWrite(NIL, 0, GetQEGangNum(), true, true, false);
@@ -1791,7 +1792,6 @@ InitializeResultRelations(PlannedStmt *plannedstmt, EState *estate, CmdType oper
}
}
-
estate->es_partition_state = NULL;
if (estate->es_result_partitions)
{
@@ -2432,7 +2432,7 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
Relation rel = heap_open(relid, AccessShareLock);
/* only relevant for AO relations */
- if(!RelationIsAoRows(rel) && !RelationIsParquet(rel))
+ if(!RelationIsAoRows(rel) && !RelationIsParquet(rel) && !RelationIsExternal(rel))
{
heap_close(rel, AccessShareLock);
return;
@@ -2466,6 +2466,45 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
Assert(found);
}
+static void CreateExternalSegFileForRelationOnMaster(Relation rel, List *segnos,
+ SharedStorageOpTasks *addTasks)
+{
+ ParquetFileSegInfo * fsinfo;
+ ListCell *cell;
+
+ Assert(RelationIsExternal(rel));
+
+ char * relname = RelationGetRelationName(rel);
+
+ foreach(cell, segnos)
+ {
+ int segno = lfirst_int(cell);
+
+ Assert(NULL != addTasks);
+ Assert(addTasks->sizeTasks >= addTasks->numTasks);
+
+ RelFileNode *n;
+
+ if (addTasks->sizeTasks == addTasks->numTasks)
+ {
+ addTasks->tasks = repalloc(addTasks->tasks,
+ addTasks->sizeTasks * sizeof(SharedStorageOpTask) * 2);
+ addTasks->sizeTasks *= 2;
+ }
+
+ n = &addTasks->tasks[addTasks->numTasks].node;
+ n->dbNode = rel->rd_node.dbNode;
+ n->relNode = rel->rd_node.relNode;
+ n->spcNode = rel->rd_node.spcNode;
+
+ addTasks->tasks[addTasks->numTasks].segno = segno;
+ addTasks->tasks[addTasks->numTasks].relname = palloc(strlen(relname) + 1);
+ strcpy(addTasks->tasks[addTasks->numTasks].relname, relname);
+
+ addTasks->numTasks++;
+ }
+}
+
static void
CreaateAoRowSegFileForRelationOnMaster(Relation rel,
AppendOnlyEntry * aoEntry, List *segnos, SharedStorageOpTasks *addTask, SharedStorageOpTasks *overwriteTask)
@@ -2647,13 +2686,20 @@ CreateAppendOnlyParquetSegFileForRelationOnMaster(Relation rel, List *segnos)
CreateParquetSegFileForRelationOnMaster(rel, aoEntry, segnos, addTasks, overwriteTasks);
pfree(aoEntry);
+
+ PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+ PostPerformSharedStorageOpTasks(addTasks);
+ PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
}
+ // TODO: Should we create empty files on orc hash distribution table?
+ // else if (RelationIsExternal(rel))
+ // {
+ // CreateExternalSegFileForRelationOnMaster(rel, segnos, addTasks);
+ // PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+ // }
- PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
- PostPerformSharedStorageOpTasks(addTasks);
- PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
DropSharedStorageOpTasks(addTasks);
- DropSharedStorageOpTasks(overwriteTasks);
+ DropSharedStorageOpTasks(overwriteTasks);
}
/*
@@ -2712,10 +2758,10 @@ ResultRelInfoSetSegFileInfo(ResultRelInfo *resultRelInfo, List *mapping)
/*
* Only relevant for AO relations.
*/
- if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
- {
- return;
- }
+// if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
+// {
+// return;
+// }
Assert(mapping);
Assert(resultRelInfo->ri_RelationDesc);
diff --git a/src/include/access/appendonlywriter.h b/src/include/access/appendonlywriter.h
index a943bcb..38d9354 100755
--- a/src/include/access/appendonlywriter.h
+++ b/src/include/access/appendonlywriter.h
@@ -129,6 +129,7 @@ extern void InitAppendOnlyWriter(void);
extern Size AppendOnlyWriterShmemSize(void);
extern bool TestCurrentTspSupportTruncate(Oid tsp);
extern List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
+extern List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
extern List *assignPerRelSegno(List *all_rels, int segment_num);
extern void UpdateMasterAosegTotals(Relation parentrel,
int segno,