You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2019/08/02 07:20:30 UTC

[hawq] 01/02: HAWQ-1734. Resolve insert issue in external table of orc

This is an automated email from the ASF dual-hosted git repository.

huor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit 33ddcf7aa87eaa47063496319681627158d76de9
Author: tuyu <tu...@oushu.io>
AuthorDate: Thu Aug 1 15:29:47 2019 +0800

    HAWQ-1734. Resolve insert issue in external table of orc
---
 src/backend/access/appendonly/appendonlywriter.c | 81 ++++++++++++++++++++++++
 src/backend/cdb/cdbdatalocality.c                | 33 ++++++++--
 src/backend/cdb/cdbquerycontextdispatching.c     | 26 ++++++--
 src/backend/commands/copy.c                      |  4 ++
 src/backend/executor/execMain.c                  | 68 ++++++++++++++++----
 src/include/access/appendonlywriter.h            |  1 +
 6 files changed, 189 insertions(+), 24 deletions(-)

diff --git a/src/backend/access/appendonly/appendonlywriter.c b/src/backend/access/appendonly/appendonlywriter.c
index addd33e..629ba3a 100644
--- a/src/backend/access/appendonly/appendonlywriter.c
+++ b/src/backend/access/appendonly/appendonlywriter.c
@@ -1185,6 +1185,72 @@ List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num,
 
 }
 
+List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num,
+        bool forNewRel, bool reuse_segfilenum_in_same_xid,
+        bool keepHash)
+{
+    /* these vars are used in GP_ROLE_DISPATCH only */
+    AORelHashEntryData	*aoentry = NULL;
+    TransactionId 		CurrentXid = GetTopTransactionId();
+    int next;
+    AOSegfileStatus *segfilestatus = NULL;
+    int remaining_num = segment_num;
+    bool has_same_txn_status = false;
+    AOSegfileStatus **maxSegno4Segment = NULL;
+
+     switch(Gp_role)
+    {
+        case GP_ROLE_EXECUTE:
+
+             Assert(existing_segnos != NIL);
+            Assert(list_length(existing_segnos) == segment_num);
+            return existing_segnos;
+
+         case GP_ROLE_UTILITY:
+
+             Assert(existing_segnos == NIL);
+            Assert(segment_num == 1);
+            return list_make1_int(RESERVED_SEGNO);
+
+         case GP_ROLE_DISPATCH:
+
+             Assert(existing_segnos == NIL);
+            Assert(segment_num > 0);
+
+             if (forNewRel)
+            {
+                int i;
+                for (i = 1; i <= segment_num; i++)
+                {
+                    existing_segnos = lappend_int(existing_segnos, i);
+                }
+                return existing_segnos;
+            }
+
+             if (Debug_appendonly_print_segfile_choice)
+            {
+                ereport(LOG, (errmsg("SetSegnoForWrite: Choosing a segno for external "
+                                "relation \"%s\" (%d) ",
+                                get_rel_name(relid), relid)));
+            }
+
+             for (int i = 0; i < segment_num; i++)
+            {
+                existing_segnos = lappend_int(existing_segnos,  i);
+            }
+            Assert(list_length(existing_segnos) == segment_num);
+
+             return existing_segnos;
+
+             /* fix this for dispatch agent. for now it's broken anyway. */
+        default:
+            Assert(false);
+            return NIL;
+    }
+
+ }
+
+
 /*
  * assignPerRelSegno
  *
@@ -1231,6 +1297,21 @@ List *assignPerRelSegno(List *all_relids, int segment_num)
 			}
 
 		}
+		else if (RelationIsExternal(rel))
+		{
+			SegfileMapNode *n;
+
+			n = makeNode(SegfileMapNode);
+			n->relid = cur_relid;
+
+			n->segnos = SetSegnoForExternalWrite(NIL, cur_relid, segment_num,
+					false, true, true);
+
+			Assert(n->relid != InvalidOid);
+			Assert(n->segnos != NIL);
+
+			mapping = lappend(mapping, n);
+		}
 
         /*
          * hold RowExclusiveLock until the end of transaction
diff --git a/src/backend/cdb/cdbdatalocality.c b/src/backend/cdb/cdbdatalocality.c
index 55f4ac0..67fe51d 100644
--- a/src/backend/cdb/cdbdatalocality.c
+++ b/src/backend/cdb/cdbdatalocality.c
@@ -4125,15 +4125,34 @@ run_allocation_algorithm(SplitAllocResult *result, List *virtual_segments, Query
 		int fileCountInRelation = list_length(rel_data->files);
 		bool FileCountBucketNumMismatch = false;
 		if (targetPolicy->bucketnum > 0) {
-		  FileCountBucketNumMismatch = fileCountInRelation %
-		    targetPolicy->bucketnum == 0 ? false : true;
+			Relation rel = heap_open(rel_data->relid, NoLock);
+			targetPolicy->bucketnum == 0 ? false : true;
+			if (!RelationIsExternal(rel))
+			{
+				FileCountBucketNumMismatch = fileCountInRelation %
+						targetPolicy->bucketnum == 0 ? false : true;
+			}
+			else
+			{
+				ListCell *lc_file;
+				int maxsegno = 0;
+				foreach(lc_file, rel_data->files)
+				{
+					Relation_File *rel_file = (Relation_File *) lfirst(lc_file);
+					if (rel_file->segno > maxsegno)
+						maxsegno = rel_file->segno;
+				}
+				FileCountBucketNumMismatch =
+				maxsegno > targetPolicy->bucketnum ? true : false;
+			}
+			heap_close(rel, NoLock);
 		}
 		if (isRelationHash && FileCountBucketNumMismatch && !allow_file_count_bucket_num_mismatch) {
-		  elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
-		      "number %d of hash table with oid=%u, some data may be lost, if you "
-		      "still want to continue the query by considering the table as random, set GUC "
-		      "allow_file_count_bucket_num_mismatch to on and try again.",
-		      fileCountInRelation, targetPolicy->bucketnum, myrelid);
+			elog(ERROR, "file count %d in catalog is not in proportion to the bucket "
+				"number %d of hash table with oid=%u, some data may be lost, if you "
+				"still want to continue the query by considering the table as random, set GUC "
+				"allow_file_count_bucket_num_mismatch to on and try again.",
+				fileCountInRelation, targetPolicy->bucketnum, myrelid);
 		}
 		/* change the virtual segment order when keep hash.
 		 * order of idMap should also be changed.
diff --git a/src/backend/cdb/cdbquerycontextdispatching.c b/src/backend/cdb/cdbquerycontextdispatching.c
index 88d4f44..167af2a 100644
--- a/src/backend/cdb/cdbquerycontextdispatching.c
+++ b/src/backend/cdb/cdbquerycontextdispatching.c
@@ -2955,10 +2955,6 @@ fetchSegFileInfos(Oid relid, List *segnos)
 		}
 
 		storageChar = get_relation_storage_type(relid);
-		/*
-		 * Get pg_appendonly information for this table.
-		 */
-		aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
 
 		/*
 		 * Based on the pg_appendonly information, fetch
@@ -2967,13 +2963,31 @@ fetchSegFileInfos(Oid relid, List *segnos)
 		 */
 		if (RELSTORAGE_AOROWS == storageChar)
 		{
+			/*
+			 * Get pg_appendonly information for this table.
+			 */
+			aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
 			AOFetchSegFileInfo(aoEntry, result, SnapshotNow);
 		}
-		else
+		else if (RELSTORAGE_PARQUET == storageChar)
 		{
-			Assert(RELSTORAGE_PARQUET == storageChar);
+			/*
+			 * Get pg_appendonly information for this table.
+			 */
+			aoEntry = GetAppendOnlyEntry(relid, SnapshotNow);
 			ParquetFetchSegFileInfo(aoEntry, result, SnapshotNow);
 		}
+		else
+		{
+			/*
+			 * Get range info for current magma hash table, which is already
+			 * available when get block location and do datalocality
+			 */
+			/*
+			 * TODO(Zongtian): vsegno -> rg list -> {range list1, ..., range list N}
+			 */
+			Assert(RELSTORAGE_EXTERNAL == storageChar);
+		}
 	}
 	return result;
 }
diff --git a/src/backend/commands/copy.c b/src/backend/commands/copy.c
index 9c21148..dbeda09 100644
--- a/src/backend/commands/copy.c
+++ b/src/backend/commands/copy.c
@@ -4331,6 +4331,9 @@ CopyFrom(CopyState cstate)
 												resultRelInfo->ri_aosegfileinfos,
 												GetQEIndex());
 
+							PlannedStmt* plannedstmt = palloc(sizeof(PlannedStmt));
+							memset(plannedstmt,0,sizeof(PlannedStmt));
+							plannedstmt->scantable_splits = cstate->splits;
 							resultRelInfo->ri_extInsertDesc =
 									InvokePlugStorageFormatInsertInit(insertInitFunc,
 						                                      resultRelInfo->ri_RelationDesc,
@@ -4340,6 +4343,7 @@ CopyFrom(CopyState cstate)
 														                  segfileinfo->segno);
 
 							pfree(insertInitFunc);
+							pfree(plannedstmt);
 						}
 						else
 						{
diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c
index 81f68f2..e21aeb6 100644
--- a/src/backend/executor/execMain.c
+++ b/src/backend/executor/execMain.c
@@ -875,8 +875,9 @@ ExecutorStart(QueryDesc *queryDesc, int eflags)
                         result_segfileinfos = GetResultRelSegFileInfos(RelationGetRelid(relinfo->ri_RelationDesc),
                                                                        estate->es_result_aosegnos, result_segfileinfos);
                     }
+
                     plannedstmt->result_segfileinfos = result_segfileinfos;
-                    
+
                     if (plannedstmt->intoClause != NULL)
                     {
                         List *segment_segnos = SetSegnoForWrite(NIL, 0, GetQEGangNum(), true, true, false);
@@ -1791,7 +1792,6 @@ InitializeResultRelations(PlannedStmt *plannedstmt, EState *estate, CmdType oper
 		}
 
 	}
-
 	estate->es_partition_state = NULL;
 	if (estate->es_result_partitions)
 	{
@@ -2432,7 +2432,7 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
 	Relation rel = heap_open(relid, AccessShareLock);
 
 	/* only relevant for AO relations */
-	if(!RelationIsAoRows(rel)  && !RelationIsParquet(rel))
+	if(!RelationIsAoRows(rel)  && !RelationIsParquet(rel) && !RelationIsExternal(rel))
 	{
 		heap_close(rel, AccessShareLock);
 		return;
@@ -2466,6 +2466,45 @@ CreateAppendOnlyParquetSegFileOnMaster(Oid relid, List *mapping)
 	Assert(found);
 }
 
+static void CreateExternalSegFileForRelationOnMaster(Relation rel, List *segnos,
+		SharedStorageOpTasks *addTasks)
+{
+	ParquetFileSegInfo * fsinfo;
+	ListCell *cell;
+
+	Assert(RelationIsExternal(rel));
+
+	char * relname = RelationGetRelationName(rel);
+
+	foreach(cell, segnos)
+	{
+		int segno = lfirst_int(cell);
+
+		Assert(NULL != addTasks);
+		Assert(addTasks->sizeTasks >= addTasks->numTasks);
+
+		RelFileNode *n;
+
+		if (addTasks->sizeTasks == addTasks->numTasks)
+		{
+			addTasks->tasks = repalloc(addTasks->tasks,
+					addTasks->sizeTasks * sizeof(SharedStorageOpTask) * 2);
+			addTasks->sizeTasks *= 2;
+		}
+
+		n = &addTasks->tasks[addTasks->numTasks].node;
+		n->dbNode = rel->rd_node.dbNode;
+		n->relNode = rel->rd_node.relNode;
+		n->spcNode = rel->rd_node.spcNode;
+
+		addTasks->tasks[addTasks->numTasks].segno = segno;
+		addTasks->tasks[addTasks->numTasks].relname = palloc(strlen(relname) + 1);
+		strcpy(addTasks->tasks[addTasks->numTasks].relname, relname);
+
+		addTasks->numTasks++;
+	}
+}
+
 	static void
 CreaateAoRowSegFileForRelationOnMaster(Relation rel,
 		AppendOnlyEntry * aoEntry, List *segnos, SharedStorageOpTasks *addTask, SharedStorageOpTasks *overwriteTask)
@@ -2647,13 +2686,20 @@ CreateAppendOnlyParquetSegFileForRelationOnMaster(Relation rel, List *segnos)
 			CreateParquetSegFileForRelationOnMaster(rel, aoEntry, segnos, addTasks, overwriteTasks);
 
 		pfree(aoEntry);
+
+		PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+		PostPerformSharedStorageOpTasks(addTasks);
+		PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
 	}
+	// TODO: Should we create empty files on orc hash distribution table?
+	// else if (RelationIsExternal(rel))
+	// {
+	// 	CreateExternalSegFileForRelationOnMaster(rel, segnos, addTasks);
+	// 	PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
+	// }
 
-	PerformSharedStorageOpTasks(addTasks, Op_CreateSegFile);
-	PostPerformSharedStorageOpTasks(addTasks);
-  PerformSharedStorageOpTasks(overwriteTasks, Op_OverWriteSegFile);
 	DropSharedStorageOpTasks(addTasks);
-  DropSharedStorageOpTasks(overwriteTasks);
+	DropSharedStorageOpTasks(overwriteTasks);
 }
 
 /*
@@ -2712,10 +2758,10 @@ ResultRelInfoSetSegFileInfo(ResultRelInfo *resultRelInfo, List *mapping)
 	/*
 	 * Only relevant for AO relations.
 	 */
-	if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
-	{
-		return;
-	}
+//	if (!relstorage_is_ao(RelinfoGetStorage(resultRelInfo)))
+//	{
+//		return;
+//	}
 
 	Assert(mapping);
 	Assert(resultRelInfo->ri_RelationDesc);
diff --git a/src/include/access/appendonlywriter.h b/src/include/access/appendonlywriter.h
index a943bcb..38d9354 100755
--- a/src/include/access/appendonlywriter.h
+++ b/src/include/access/appendonlywriter.h
@@ -129,6 +129,7 @@ extern void InitAppendOnlyWriter(void);
 extern Size AppendOnlyWriterShmemSize(void);
 extern bool TestCurrentTspSupportTruncate(Oid tsp);
 extern List *SetSegnoForWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
+extern List *SetSegnoForExternalWrite(List *existing_segnos, Oid relid, int segment_num, bool forNewRel, bool reuse_segfilenum_in_same_xid, bool keepHash);
 extern List *assignPerRelSegno(List *all_rels, int segment_num);
 extern void UpdateMasterAosegTotals(Relation parentrel,
 									int segno,