You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/06/21 23:35:22 UTC

incubator-hawq git commit: fix share input scan bug for writer part

Repository: incubator-hawq
Updated Branches:
  refs/heads/master 4ef7022e7 -> 339806f3a


fix share input scan bug for writer part


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/339806f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/339806f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/339806f3

Branch: refs/heads/master
Commit: 339806f3a40cf85686496412984e65ebfb481dbd
Parents: 4ef7022
Author: amyrazz44 <ab...@pivotal.io>
Authored: Mon May 8 17:27:07 2017 +0800
Committer: Ruilong Huo <hu...@Ruilongs-MacBook-Pro.local>
Committed: Thu Jun 22 07:32:19 2017 +0800

----------------------------------------------------------------------
 src/backend/executor/nodeMaterial.c       |  38 +++++-
 src/backend/executor/nodeShareInputScan.c | 158 ++++++++++++++++++++++++-
 src/backend/utils/misc/guc.c              |  12 +-
 src/include/executor/nodeMaterial.h       |   1 +
 src/include/executor/nodeShareInputScan.h |   2 +
 src/include/utils/guc.h                   |   2 +
 6 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeMaterial.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index f2b82b2..4589351 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -41,19 +41,21 @@
 #include "postgres.h"
 
 #include "executor/executor.h"
-#include "executor/nodeMaterial.h"
 #include "executor/instrument.h"        /* Instrumentation */
 #include "utils/tuplestorenew.h"
-
+#include "executor/nodeMaterial.h"
 #include "miscadmin.h"
 
 #include "cdb/cdbvars.h"
+#include "postmaster/primary_mirror_mode.h"
 
+
+static int sisc_writer_lock_fd = -1;
 static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf);
 static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt);
 static void DestroyTupleStore(MaterialState *node);
 static void ExecMaterialResetWorkfileState(MaterialState *node);
-
+static void mkLockFileForWriter(int size, int share_id, char * name);
 
 /* ----------------------------------------------------------------
  *		ExecMaterial
@@ -115,6 +117,7 @@ ExecMaterial(MaterialState *node)
 
 			ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
 			tsa = ntuplestore_create_accessor(ts, true);
+			mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer");
 		}
 		else
 		{
@@ -247,6 +250,8 @@ ExecMaterial(MaterialState *node)
 
 					node->share_lk_ctxt = shareinput_writer_notifyready(ma->share_id, ma->nsharer_xslice,
 							estate->es_plannedstmt->planGen);
+					if(sisc_writer_lock_fd > 0)
+						close(sisc_writer_lock_fd);
 				}
 			}
 			return NULL;
@@ -759,3 +764,30 @@ ExecEagerFreeMaterial(MaterialState *node)
 	}
 }
 
+
+/*
+ * mkLockFileForWriter
+ * 
+ * Create a unique lock file for writer, then use flock() to lock/unlock the lock file.
+ * We can make sure the lock file will be locked forerver until the writer process quits.
+ */
+static void mkLockFileForWriter(int size, int share_id, char * name)
+{
+	char *lock_file;
+	int lock;
+
+	lock_file = (char *)palloc0(size);
+	generate_lock_file_name(lock_file, size, share_id, name);
+	elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file);
+	sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU);
+	if(sisc_writer_lock_fd < 0)
+	{
+		elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno);
+	}
+	lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB);
+	if(lock == -1)
+		elog(DEBUG3, "Could not lock lock file  \"%s\" for writer in SISC . The error number is %d", lock_file, errno);
+	else if(lock == 0)
+		elog(LOG, "Successfully locked lock file  \"%s\" for writer in SISC.", lock_file);
+	pfree(lock_file);
+}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeShareInputScan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeShareInputScan.c b/src/backend/executor/nodeShareInputScan.c
index 049943b..88c695d 100644
--- a/src/backend/executor/nodeShareInputScan.c
+++ b/src/backend/executor/nodeShareInputScan.c
@@ -43,7 +43,6 @@
 #include "cdb/cdbvars.h"
 #include "executor/executor.h"
 #include "executor/nodeShareInputScan.h"
-
 #include "utils/tuplestorenew.h"
 #include "miscadmin.h"
 
@@ -552,11 +551,59 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name)
 	}
 }
 
+
+char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name)
+{
+	char *lock_file = palloc0(MAXPGPATH);
+
+	if(strncmp("writer", name, strlen("writer")) ==0 )
+	{
+		strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1);
+	}
+	else
+	{
+		strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1);
+	}
+	strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1);
+	return lock_file;	
+}
+
+void drop_lock_files(ShareInput_Lk_Context *lk_ctxt)
+{
+	char *writer_lock_file = NULL;
+	char *reader_lock_file = NULL;
+
+	writer_lock_file = joint_lock_file_name(lk_ctxt, "writer");
+	if(access(writer_lock_file, F_OK) == 0)
+	{
+		elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file);
+		unlink(writer_lock_file);
+	}
+	else
+	{
+		elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file);
+	}
+	pfree(writer_lock_file);
+	reader_lock_file = joint_lock_file_name(lk_ctxt, "reader");	
+	if(access(reader_lock_file, F_OK) == 0)
+	{
+		elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file);
+		unlink(writer_lock_file);
+	}
+	else
+	{
+		elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file);
+	}
+	pfree(reader_lock_file);
+	
+}
+
 static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
 {
 	int err;
 
 	elog(DEBUG1, "shareinput_clean_lk_ctxt cleanup lk ctxt %p", lk_ctxt);
+	
 
 	if(lk_ctxt->readyfd >= 0)
 	{
@@ -590,6 +637,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
 		lk_ctxt->del_done = false;
 	}
 
+	elog(DEBUG3, "Begin to drop all the lock files for SISC");
+	drop_lock_files(lk_ctxt);
+	elog(DEBUG3, "End of drop lock files for SISC");
+
 	gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
 }
 
@@ -666,6 +717,29 @@ write_retry:
 	return 0;
 }
 
+
+
+/*
+ * generate_lock_file_name
+ *
+ * Called by reader or writer to make the unique lock file name.
+ */
+void generate_lock_file_name(char* p, int size, int share_id, const char* name)
+{
+	if (strncmp(name , "writer", strlen("writer")) == 0)
+	{
+		sisc_lockname(p, size, share_id, "ready");
+		strncat(p, name, size - strlen(p) - 1);
+	}
+	else
+	{
+		sisc_lockname(p, size, share_id, "done");
+		strncat(p, name, size - strlen(p) - 1);
+	}
+}
+
+
+
 /* 
  * Readiness (a) synchronization.
  *
@@ -709,6 +783,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 	struct timeval tval;
 	int n;
 	char a;
+	int file_exists = -1;
+	int timeout_interval = 0;
+	bool flag = false; //A tag for file exists or not.
+	int lock_fd = -1;
+	int lock = -1;
+	bool is_lock_firsttime = true;
+	char *writer_lock_file = NULL; //current path for lock file.
 
 	ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
 
@@ -738,6 +819,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 	if(pctxt->donefd < 0)
 		elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
 
+	writer_lock_file = joint_lock_file_name(pctxt, "writer");
+	elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file);
+
 	while(1)
 	{
 		CHECK_FOR_INTERRUPTS();
@@ -773,13 +857,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 			retry_read(pctxt->readyfd, &a, 1);
 			Assert(rwsize == 1 && a == 'a');
 
-			elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
+			elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
 					share_id, currentSliceId);
 
 			if (planGen == PLANGEN_PLANNER)
 			{
 				/* For planner-generated plans, we send ack back after receiving the handshake */
-				elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
+				elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
 						share_id, currentSliceId);
 
 #if USE_ASSERT_CHECKING
@@ -793,8 +877,70 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 		}
 		else if(n==0)
 		{
-			elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
-					share_id, currentSliceId);
+			file_exists = access(writer_lock_file, F_OK);	
+			if(file_exists != 0)
+			{
+				elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval);
+				if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer
+				{
+					elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break",
+						share_id, currentSliceId);
+					pfree(writer_lock_file);
+					break;
+				}
+				timeout_interval += tval.tv_sec * 1000 + tval.tv_usec;
+			}
+			else
+			{
+				elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file);
+				flag = true;
+				lock_fd = open(writer_lock_file, O_RDONLY);
+				if(lock_fd < 0)
+				{
+					elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
+					continue;
+				}
+				lock = flock(lock_fd, LOCK_EX | LOCK_NB);
+				if(lock == -1)
+				{
+					/*
+					 * Reader try to lock the lock file which writer created until locked the lock file successfully 
+                     * which means that writer process quit. If reader lock the lock file failed, it means that writer
+					 * process is healthy.
+					 */
+					elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
+				}
+				else if(lock == 0)
+				{
+					/*
+					 * There is one situation to consider about.
+					 * Writer need a time interval to lock the lock file after the lock file has been created.
+					 * So, if reader lock the lock file ahead of writer, we should unlock it.
+					 * If reader lock the lock file after writer, it means that writer process has abort.
+					 * We should break the loop to make sure reader no longer wait for writer.
+					 */  
+					if(is_lock_firsttime == true)  
+					{
+						lock = flock(lock_fd, LOCK_UN); 
+						is_lock_firsttime = false;
+						elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file);
+						continue;
+					}
+					else
+					{
+						elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file);
+						/* Retry to close the fd in case there is interruption from signal */
+						while ((close(lock_fd) < 0) && (errno == EINTR))
+						{
+							elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)");
+						}
+						pfree(writer_lock_file);
+						break; 
+					}
+				}
+				elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
+						share_id, currentSliceId);
+			}
 		}
 		else
 		{
@@ -802,6 +948,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 			elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ",
 					share_id, currentSliceId, save_errno);
 		}
+
 	}
 	return (void *) pctxt;
 }
@@ -853,7 +1000,6 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen)
 	pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
 	if(pctxt->donefd < 0)
 		elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
-
 	for(n=0; n<xslice; ++n)
 	{
 #if USE_ASSERT_CHECKING

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6769d3b..87f44c2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -668,6 +668,8 @@ bool		gp_cte_sharing = false;
 
 char	   *gp_idf_deduplicate_str;
 
+int share_input_scan_wait_lockfile_timeout;
+
 /* gp_disable_catalog_access_on_segment */
 bool gp_disable_catalog_access_on_segment = false;
 
@@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] =
 		&metadata_cache_max_hdfs_file_num,
 		524288, 32768, 8388608, NULL, NULL
 	},
+	{
+		{"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS,
+			gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."),
+			NULL
+		},
+		&share_input_scan_wait_lockfile_timeout,
+		300000, 1, 65536, NULL, NULL
+	},
+
 
 	/* End-of-list marker */
 	{
@@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] =
 		NULL, NULL, NULL
 	},
 
-
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeMaterial.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index 509f78c..41eb6f7 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -15,6 +15,7 @@
 #define NODEMATERIAL_H
 
 #include "nodes/execnodes.h"
+#include "executor/nodeShareInputScan.h"
 
 extern int	ExecCountSlotsMaterial(Material *node);
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeShareInputScan.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeShareInputScan.h b/src/include/executor/nodeShareInputScan.h
index 23025b9..730ffe1 100644
--- a/src/include/executor/nodeShareInputScan.h
+++ b/src/include/executor/nodeShareInputScan.h
@@ -30,6 +30,7 @@
 #define NODESHAREINPUTSCAN_H
 
 #include "nodes/execnodes.h"
+
 extern int ExecCountSlotsShareInputScan(ShareInputScan* node);
 extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node);
@@ -52,4 +53,5 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState *
 	return &node->ss.ps.gpmon_pkt;
 }
 
+extern void generate_lock_file_name(char* p, int size, int share_id, const char* name);
 #endif   /* NODESHAREINPUTSCAN_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 546584f..530abe5 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg;
 extern bool optimizer_parallel_union;
 extern bool optimizer_array_constraints;
 
+/* Timeout for shareinputscan writer/reader wait for lock files */ 
+extern int share_input_scan_wait_lockfile_timeout;
 
 /* fallback in ranger ACL check */
 extern int information_schema_namespace_oid;