You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by rl...@apache.org on 2017/07/03 05:47:03 UTC

[2/2] incubator-hawq git commit: Revert "fix share input scan bug for writer part" and releated commits.

Revert "fix share input scan bug for writer part" and releated commits.

This commit only made change on write part, will cause issue in some situation.
We need to commit again when all function changes are included.

This reverts commit 339806f3a40cf85686496412984e65ebfb481dbd.


Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/4aae1a07
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/4aae1a07
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/4aae1a07

Branch: refs/heads/master
Commit: 4aae1a07670ebc820e32d525709f562356e97235
Parents: a85b0ce
Author: rlei <rl...@pivotal.io>
Authored: Mon Jul 3 13:44:14 2017 +0800
Committer: rlei <rl...@pivotal.io>
Committed: Mon Jul 3 13:44:14 2017 +0800

----------------------------------------------------------------------
 src/backend/executor/nodeMaterial.c       |  38 +-----
 src/backend/executor/nodeShareInputScan.c | 158 +------------------------
 src/backend/utils/misc/guc.c              |  12 +-
 src/include/executor/nodeMaterial.h       |   1 -
 src/include/executor/nodeShareInputScan.h |   2 -
 src/include/utils/guc.h                   |   2 -
 6 files changed, 10 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/backend/executor/nodeMaterial.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index 4589351..f2b82b2 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -41,21 +41,19 @@
 #include "postgres.h"
 
 #include "executor/executor.h"
+#include "executor/nodeMaterial.h"
 #include "executor/instrument.h"        /* Instrumentation */
 #include "utils/tuplestorenew.h"
-#include "executor/nodeMaterial.h"
+
 #include "miscadmin.h"
 
 #include "cdb/cdbvars.h"
-#include "postmaster/primary_mirror_mode.h"
 
-
-static int sisc_writer_lock_fd = -1;
 static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf);
 static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt);
 static void DestroyTupleStore(MaterialState *node);
 static void ExecMaterialResetWorkfileState(MaterialState *node);
-static void mkLockFileForWriter(int size, int share_id, char * name);
+
 
 /* ----------------------------------------------------------------
  *		ExecMaterial
@@ -117,7 +115,6 @@ ExecMaterial(MaterialState *node)
 
 			ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
 			tsa = ntuplestore_create_accessor(ts, true);
-			mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer");
 		}
 		else
 		{
@@ -250,8 +247,6 @@ ExecMaterial(MaterialState *node)
 
 					node->share_lk_ctxt = shareinput_writer_notifyready(ma->share_id, ma->nsharer_xslice,
 							estate->es_plannedstmt->planGen);
-					if(sisc_writer_lock_fd > 0)
-						close(sisc_writer_lock_fd);
 				}
 			}
 			return NULL;
@@ -764,30 +759,3 @@ ExecEagerFreeMaterial(MaterialState *node)
 	}
 }
 
-
-/*
- * mkLockFileForWriter
- * 
- * Create a unique lock file for writer, then use flock() to lock/unlock the lock file.
- * We can make sure the lock file will be locked forerver until the writer process quits.
- */
-static void mkLockFileForWriter(int size, int share_id, char * name)
-{
-	char *lock_file;
-	int lock;
-
-	lock_file = (char *)palloc0(size);
-	generate_lock_file_name(lock_file, size, share_id, name);
-	elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file);
-	sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU);
-	if(sisc_writer_lock_fd < 0)
-	{
-		elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno);
-	}
-	lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB);
-	if(lock == -1)
-		elog(DEBUG3, "Could not lock lock file  \"%s\" for writer in SISC . The error number is %d", lock_file, errno);
-	else if(lock == 0)
-		elog(LOG, "Successfully locked lock file  \"%s\" for writer in SISC.", lock_file);
-	pfree(lock_file);
-}

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/backend/executor/nodeShareInputScan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeShareInputScan.c b/src/backend/executor/nodeShareInputScan.c
index 88c695d..049943b 100644
--- a/src/backend/executor/nodeShareInputScan.c
+++ b/src/backend/executor/nodeShareInputScan.c
@@ -43,6 +43,7 @@
 #include "cdb/cdbvars.h"
 #include "executor/executor.h"
 #include "executor/nodeShareInputScan.h"
+
 #include "utils/tuplestorenew.h"
 #include "miscadmin.h"
 
@@ -551,59 +552,11 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name)
 	}
 }
 
-
-char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name)
-{
-	char *lock_file = palloc0(MAXPGPATH);
-
-	if(strncmp("writer", name, strlen("writer")) ==0 )
-	{
-		strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1);
-	}
-	else
-	{
-		strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1);
-	}
-	strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1);
-	return lock_file;	
-}
-
-void drop_lock_files(ShareInput_Lk_Context *lk_ctxt)
-{
-	char *writer_lock_file = NULL;
-	char *reader_lock_file = NULL;
-
-	writer_lock_file = joint_lock_file_name(lk_ctxt, "writer");
-	if(access(writer_lock_file, F_OK) == 0)
-	{
-		elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file);
-		unlink(writer_lock_file);
-	}
-	else
-	{
-		elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file);
-	}
-	pfree(writer_lock_file);
-	reader_lock_file = joint_lock_file_name(lk_ctxt, "reader");	
-	if(access(reader_lock_file, F_OK) == 0)
-	{
-		elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file);
-		unlink(writer_lock_file);
-	}
-	else
-	{
-		elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file);
-	}
-	pfree(reader_lock_file);
-	
-}
-
 static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
 {
 	int err;
 
 	elog(DEBUG1, "shareinput_clean_lk_ctxt cleanup lk ctxt %p", lk_ctxt);
-	
 
 	if(lk_ctxt->readyfd >= 0)
 	{
@@ -637,10 +590,6 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
 		lk_ctxt->del_done = false;
 	}
 
-	elog(DEBUG3, "Begin to drop all the lock files for SISC");
-	drop_lock_files(lk_ctxt);
-	elog(DEBUG3, "End of drop lock files for SISC");
-
 	gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
 }
 
@@ -717,29 +666,6 @@ write_retry:
 	return 0;
 }
 
-
-
-/*
- * generate_lock_file_name
- *
- * Called by reader or writer to make the unique lock file name.
- */
-void generate_lock_file_name(char* p, int size, int share_id, const char* name)
-{
-	if (strncmp(name , "writer", strlen("writer")) == 0)
-	{
-		sisc_lockname(p, size, share_id, "ready");
-		strncat(p, name, size - strlen(p) - 1);
-	}
-	else
-	{
-		sisc_lockname(p, size, share_id, "done");
-		strncat(p, name, size - strlen(p) - 1);
-	}
-}
-
-
-
 /* 
  * Readiness (a) synchronization.
  *
@@ -783,13 +709,6 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 	struct timeval tval;
 	int n;
 	char a;
-	int file_exists = -1;
-	int timeout_interval = 0;
-	bool flag = false; //A tag for file exists or not.
-	int lock_fd = -1;
-	int lock = -1;
-	bool is_lock_firsttime = true;
-	char *writer_lock_file = NULL; //current path for lock file.
 
 	ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
 
@@ -819,9 +738,6 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 	if(pctxt->donefd < 0)
 		elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
 
-	writer_lock_file = joint_lock_file_name(pctxt, "writer");
-	elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file);
-
 	while(1)
 	{
 		CHECK_FOR_INTERRUPTS();
@@ -857,13 +773,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 			retry_read(pctxt->readyfd, &a, 1);
 			Assert(rwsize == 1 && a == 'a');
 
-			elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
+			elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
 					share_id, currentSliceId);
 
 			if (planGen == PLANGEN_PLANNER)
 			{
 				/* For planner-generated plans, we send ack back after receiving the handshake */
-				elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
+				elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
 						share_id, currentSliceId);
 
 #if USE_ASSERT_CHECKING
@@ -877,70 +793,8 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 		}
 		else if(n==0)
 		{
-			file_exists = access(writer_lock_file, F_OK);	
-			if(file_exists != 0)
-			{
-				elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval);
-				if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer
-				{
-					elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break",
-						share_id, currentSliceId);
-					pfree(writer_lock_file);
-					break;
-				}
-				timeout_interval += tval.tv_sec * 1000 + tval.tv_usec;
-			}
-			else
-			{
-				elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file);
-				flag = true;
-				lock_fd = open(writer_lock_file, O_RDONLY);
-				if(lock_fd < 0)
-				{
-					elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
-					continue;
-				}
-				lock = flock(lock_fd, LOCK_EX | LOCK_NB);
-				if(lock == -1)
-				{
-					/*
-					 * Reader try to lock the lock file which writer created until locked the lock file successfully 
-                     * which means that writer process quit. If reader lock the lock file failed, it means that writer
-					 * process is healthy.
-					 */
-					elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
-				}
-				else if(lock == 0)
-				{
-					/*
-					 * There is one situation to consider about.
-					 * Writer need a time interval to lock the lock file after the lock file has been created.
-					 * So, if reader lock the lock file ahead of writer, we should unlock it.
-					 * If reader lock the lock file after writer, it means that writer process has abort.
-					 * We should break the loop to make sure reader no longer wait for writer.
-					 */  
-					if(is_lock_firsttime == true)  
-					{
-						lock = flock(lock_fd, LOCK_UN); 
-						is_lock_firsttime = false;
-						elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file);
-						continue;
-					}
-					else
-					{
-						elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file);
-						/* Retry to close the fd in case there is interruption from signal */
-						while ((close(lock_fd) < 0) && (errno == EINTR))
-						{
-							elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)");
-						}
-						pfree(writer_lock_file);
-						break; 
-					}
-				}
-				elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
-						share_id, currentSliceId);
-			}
+			elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
+					share_id, currentSliceId);
 		}
 		else
 		{
@@ -948,7 +802,6 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
 			elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ",
 					share_id, currentSliceId, save_errno);
 		}
-
 	}
 	return (void *) pctxt;
 }
@@ -1000,6 +853,7 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen)
 	pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
 	if(pctxt->donefd < 0)
 		elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
+
 	for(n=0; n<xslice; ++n)
 	{
 #if USE_ASSERT_CHECKING

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 87f44c2..6769d3b 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -668,8 +668,6 @@ bool		gp_cte_sharing = false;
 
 char	   *gp_idf_deduplicate_str;
 
-int share_input_scan_wait_lockfile_timeout;
-
 /* gp_disable_catalog_access_on_segment */
 bool gp_disable_catalog_access_on_segment = false;
 
@@ -6687,15 +6685,6 @@ static struct config_int ConfigureNamesInt[] =
 		&metadata_cache_max_hdfs_file_num,
 		524288, 32768, 8388608, NULL, NULL
 	},
-	{
-		{"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS,
-			gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."),
-			NULL
-		},
-		&share_input_scan_wait_lockfile_timeout,
-		300000, 1, 65536, NULL, NULL
-	},
-
 
 	/* End-of-list marker */
 	{
@@ -8360,6 +8349,7 @@ static struct config_string ConfigureNamesString[] =
 		NULL, NULL, NULL
 	},
 
+
 	/* End-of-list marker */
 	{
 		{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/include/executor/nodeMaterial.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index 41eb6f7..509f78c 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -15,7 +15,6 @@
 #define NODEMATERIAL_H
 
 #include "nodes/execnodes.h"
-#include "executor/nodeShareInputScan.h"
 
 extern int	ExecCountSlotsMaterial(Material *node);
 extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/include/executor/nodeShareInputScan.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeShareInputScan.h b/src/include/executor/nodeShareInputScan.h
index 730ffe1..23025b9 100644
--- a/src/include/executor/nodeShareInputScan.h
+++ b/src/include/executor/nodeShareInputScan.h
@@ -30,7 +30,6 @@
 #define NODESHAREINPUTSCAN_H
 
 #include "nodes/execnodes.h"
-
 extern int ExecCountSlotsShareInputScan(ShareInputScan* node);
 extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags);
 extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node);
@@ -53,5 +52,4 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState *
 	return &node->ss.ps.gpmon_pkt;
 }
 
-extern void generate_lock_file_name(char* p, int size, int share_id, const char* name);
 #endif   /* NODESHAREINPUTSCAN_H */

http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/4aae1a07/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 530abe5..546584f 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -444,8 +444,6 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg;
 extern bool optimizer_parallel_union;
 extern bool optimizer_array_constraints;
 
-/* Timeout for shareinputscan writer/reader wait for lock files */ 
-extern int share_input_scan_wait_lockfile_timeout;
 
 /* fallback in ranger ACL check */
 extern int information_schema_namespace_oid;