You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by hu...@apache.org on 2017/06/21 23:35:22 UTC
incubator-hawq git commit: fix share input scan bug for writer part
Repository: incubator-hawq
Updated Branches:
refs/heads/master 4ef7022e7 -> 339806f3a
fix share input scan bug for writer part
Project: http://git-wip-us.apache.org/repos/asf/incubator-hawq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-hawq/commit/339806f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-hawq/tree/339806f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-hawq/diff/339806f3
Branch: refs/heads/master
Commit: 339806f3a40cf85686496412984e65ebfb481dbd
Parents: 4ef7022
Author: amyrazz44 <ab...@pivotal.io>
Authored: Mon May 8 17:27:07 2017 +0800
Committer: Ruilong Huo <hu...@Ruilongs-MacBook-Pro.local>
Committed: Thu Jun 22 07:32:19 2017 +0800
----------------------------------------------------------------------
src/backend/executor/nodeMaterial.c | 38 +++++-
src/backend/executor/nodeShareInputScan.c | 158 ++++++++++++++++++++++++-
src/backend/utils/misc/guc.c | 12 +-
src/include/executor/nodeMaterial.h | 1 +
src/include/executor/nodeShareInputScan.h | 2 +
src/include/utils/guc.h | 2 +
6 files changed, 203 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeMaterial.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeMaterial.c b/src/backend/executor/nodeMaterial.c
index f2b82b2..4589351 100644
--- a/src/backend/executor/nodeMaterial.c
+++ b/src/backend/executor/nodeMaterial.c
@@ -41,19 +41,21 @@
#include "postgres.h"
#include "executor/executor.h"
-#include "executor/nodeMaterial.h"
#include "executor/instrument.h" /* Instrumentation */
#include "utils/tuplestorenew.h"
-
+#include "executor/nodeMaterial.h"
#include "miscadmin.h"
#include "cdb/cdbvars.h"
+#include "postmaster/primary_mirror_mode.h"
+
+static int sisc_writer_lock_fd = -1;
static void ExecMaterialExplainEnd(PlanState *planstate, struct StringInfoData *buf);
static void ExecChildRescan(MaterialState *node, ExprContext *exprCtxt);
static void DestroyTupleStore(MaterialState *node);
static void ExecMaterialResetWorkfileState(MaterialState *node);
-
+static void mkLockFileForWriter(int size, int share_id, char * name);
/* ----------------------------------------------------------------
* ExecMaterial
@@ -115,6 +117,7 @@ ExecMaterial(MaterialState *node)
ts = ntuplestore_create_readerwriter(rwfile_prefix, PlanStateOperatorMemKB((PlanState *)node) * 1024, true);
tsa = ntuplestore_create_accessor(ts, true);
+ mkLockFileForWriter(MAXPGPATH, ma->share_id, "writer");
}
else
{
@@ -247,6 +250,8 @@ ExecMaterial(MaterialState *node)
node->share_lk_ctxt = shareinput_writer_notifyready(ma->share_id, ma->nsharer_xslice,
estate->es_plannedstmt->planGen);
+ if(sisc_writer_lock_fd > 0)
+ close(sisc_writer_lock_fd);
}
}
return NULL;
@@ -759,3 +764,30 @@ ExecEagerFreeMaterial(MaterialState *node)
}
}
+
+/*
+ * mkLockFileForWriter
+ *
+ * Create a unique lock file for writer, then use flock() to lock/unlock the lock file.
+ * We can make sure the lock file will be locked forerver until the writer process quits.
+ */
+static void mkLockFileForWriter(int size, int share_id, char * name)
+{
+ char *lock_file;
+ int lock;
+
+ lock_file = (char *)palloc0(size);
+ generate_lock_file_name(lock_file, size, share_id, name);
+ elog(DEBUG3, "The lock file for writer in SISC is %s", lock_file);
+ sisc_writer_lock_fd = open(lock_file, O_CREAT, S_IRWXU);
+ if(sisc_writer_lock_fd < 0)
+ {
+ elog(ERROR, "Could not create lock file %s for writer in SISC. The error number is %d", lock_file, errno);
+ }
+ lock = flock(sisc_writer_lock_fd, LOCK_EX | LOCK_NB);
+ if(lock == -1)
+ elog(DEBUG3, "Could not lock lock file \"%s\" for writer in SISC . The error number is %d", lock_file, errno);
+ else if(lock == 0)
+ elog(LOG, "Successfully locked lock file \"%s\" for writer in SISC.", lock_file);
+ pfree(lock_file);
+}
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/executor/nodeShareInputScan.c
----------------------------------------------------------------------
diff --git a/src/backend/executor/nodeShareInputScan.c b/src/backend/executor/nodeShareInputScan.c
index 049943b..88c695d 100644
--- a/src/backend/executor/nodeShareInputScan.c
+++ b/src/backend/executor/nodeShareInputScan.c
@@ -43,7 +43,6 @@
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeShareInputScan.h"
-
#include "utils/tuplestorenew.h"
#include "miscadmin.h"
@@ -552,11 +551,59 @@ static void sisc_lockname(char* p, int size, int share_id, const char* name)
}
}
+
+char *joint_lock_file_name(ShareInput_Lk_Context *lk_ctxt, char *name)
+{
+ char *lock_file = palloc0(MAXPGPATH);
+
+ if(strncmp("writer", name, strlen("writer")) ==0 )
+ {
+ strncat(lock_file, lk_ctxt->lkname_ready, MAXPGPATH - strlen(lock_file) - 1);
+ }
+ else
+ {
+ strncat(lock_file, lk_ctxt->lkname_done, MAXPGPATH - strlen(lock_file) - 1);
+ }
+ strncat(lock_file, name, MAXPGPATH - strlen(lock_file) -1);
+ return lock_file;
+}
+
+void drop_lock_files(ShareInput_Lk_Context *lk_ctxt)
+{
+ char *writer_lock_file = NULL;
+ char *reader_lock_file = NULL;
+
+ writer_lock_file = joint_lock_file_name(lk_ctxt, "writer");
+ if(access(writer_lock_file, F_OK) == 0)
+ {
+ elog(DEBUG3, "Drop writer's lock files %s in SISC", writer_lock_file);
+ unlink(writer_lock_file);
+ }
+ else
+ {
+ elog(DEBUG3, "Writer's lock files %s has been dropped already in SISC", writer_lock_file);
+ }
+ pfree(writer_lock_file);
+ reader_lock_file = joint_lock_file_name(lk_ctxt, "reader");
+ if(access(reader_lock_file, F_OK) == 0)
+ {
+ elog(DEBUG3, "Drop reader's lock files %s in SISC", reader_lock_file);
+ unlink(writer_lock_file);
+ }
+ else
+ {
+ elog(DEBUG3, "Reader's lock files %s has been dropped already in SISC", reader_lock_file);
+ }
+ pfree(reader_lock_file);
+
+}
+
static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
{
int err;
elog(DEBUG1, "shareinput_clean_lk_ctxt cleanup lk ctxt %p", lk_ctxt);
+
if(lk_ctxt->readyfd >= 0)
{
@@ -590,6 +637,10 @@ static void shareinput_clean_lk_ctxt(ShareInput_Lk_Context *lk_ctxt)
lk_ctxt->del_done = false;
}
+ elog(DEBUG3, "Begin to drop all the lock files for SISC");
+ drop_lock_files(lk_ctxt);
+ elog(DEBUG3, "End of drop lock files for SISC");
+
gp_free2 (lk_ctxt, sizeof(ShareInput_Lk_Context));
}
@@ -666,6 +717,29 @@ write_retry:
return 0;
}
+
+
+/*
+ * generate_lock_file_name
+ *
+ * Called by reader or writer to make the unique lock file name.
+ */
+void generate_lock_file_name(char* p, int size, int share_id, const char* name)
+{
+ if (strncmp(name , "writer", strlen("writer")) == 0)
+ {
+ sisc_lockname(p, size, share_id, "ready");
+ strncat(p, name, size - strlen(p) - 1);
+ }
+ else
+ {
+ sisc_lockname(p, size, share_id, "done");
+ strncat(p, name, size - strlen(p) - 1);
+ }
+}
+
+
+
/*
* Readiness (a) synchronization.
*
@@ -709,6 +783,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
struct timeval tval;
int n;
char a;
+ int file_exists = -1;
+ int timeout_interval = 0;
+ bool flag = false; //A tag for file exists or not.
+ int lock_fd = -1;
+ int lock = -1;
+ bool is_lock_firsttime = true;
+ char *writer_lock_file = NULL; //current path for lock file.
ShareInput_Lk_Context *pctxt = gp_malloc(sizeof(ShareInput_Lk_Context));
@@ -738,6 +819,9 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
+ writer_lock_file = joint_lock_file_name(pctxt, "writer");
+ elog(DEBUG3, "The lock file of writer in SISC is %s", writer_lock_file);
+
while(1)
{
CHECK_FOR_INTERRUPTS();
@@ -773,13 +857,13 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
retry_read(pctxt->readyfd, &a, 1);
Assert(rwsize == 1 && a == 'a');
- elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
+ elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready got writer's handshake",
share_id, currentSliceId);
if (planGen == PLANGEN_PLANNER)
{
/* For planner-generated plans, we send ack back after receiving the handshake */
- elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
+ elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready writing ack back to writer",
share_id, currentSliceId);
#if USE_ASSERT_CHECKING
@@ -793,8 +877,70 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
}
else if(n==0)
{
- elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
- share_id, currentSliceId);
+ file_exists = access(writer_lock_file, F_OK);
+ if(file_exists != 0)
+ {
+ elog(DEBUG3, "Wait lock file for writer time out interval is %d", timeout_interval);
+ if(timeout_interval >= share_input_scan_wait_lockfile_timeout || flag == true) //If lock file never exists or disappeared, reader will no longer waiting for writer
+ {
+ elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready time out and break",
+ share_id, currentSliceId);
+ pfree(writer_lock_file);
+ break;
+ }
+ timeout_interval += tval.tv_sec * 1000 + tval.tv_usec;
+ }
+ else
+ {
+ elog(LOG, "writer lock file of shareinput_reader_waitready() is %s", writer_lock_file);
+ flag = true;
+ lock_fd = open(writer_lock_file, O_RDONLY);
+ if(lock_fd < 0)
+ {
+ elog(DEBUG3, "Open writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
+ continue;
+ }
+ lock = flock(lock_fd, LOCK_EX | LOCK_NB);
+ if(lock == -1)
+ {
+ /*
+ * Reader try to lock the lock file which writer created until locked the lock file successfully
+ * which means that writer process quit. If reader lock the lock file failed, it means that writer
+ * process is healthy.
+ */
+ elog(DEBUG3, "Lock writer's lock file %s failed!, error number is %d", writer_lock_file, errno);
+ }
+ else if(lock == 0)
+ {
+ /*
+ * There is one situation to consider about.
+ * Writer need a time interval to lock the lock file after the lock file has been created.
+ * So, if reader lock the lock file ahead of writer, we should unlock it.
+ * If reader lock the lock file after writer, it means that writer process has abort.
+ * We should break the loop to make sure reader no longer wait for writer.
+ */
+ if(is_lock_firsttime == true)
+ {
+ lock = flock(lock_fd, LOCK_UN);
+ is_lock_firsttime = false;
+ elog(DEBUG3, "Lock writer's lock file %s first time successfully in SISC! Unlock it.", writer_lock_file);
+ continue;
+ }
+ else
+ {
+ elog(LOG, "Lock writer's lock file %s successfully in SISC!", writer_lock_file);
+ /* Retry to close the fd in case there is interruption from signal */
+ while ((close(lock_fd) < 0) && (errno == EINTR))
+ {
+ elog(DEBUG3, "Failed to close SISC temporary file due to strerror(errno)");
+ }
+ pfree(writer_lock_file);
+ break;
+ }
+ }
+ elog(DEBUG1, "SISC READER (shareid=%d, slice=%d): Wait ready time out once",
+ share_id, currentSliceId);
+ }
}
else
{
@@ -802,6 +948,7 @@ shareinput_reader_waitready(int share_id, PlanGenerator planGen)
elog(LOG, "SISC READER (shareid=%d, slice=%d): Wait ready try again, errno %d ... ",
share_id, currentSliceId, save_errno);
}
+
}
return (void *) pctxt;
}
@@ -853,7 +1000,6 @@ shareinput_writer_notifyready(int share_id, int xslice, PlanGenerator planGen)
pctxt->donefd = open(pctxt->lkname_done, O_RDWR, 0600);
if(pctxt->donefd < 0)
elog(ERROR, "could not open fifo \"%s\": %m", pctxt->lkname_done);
-
for(n=0; n<xslice; ++n)
{
#if USE_ASSERT_CHECKING
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/backend/utils/misc/guc.c
----------------------------------------------------------------------
diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c
index 6769d3b..87f44c2 100644
--- a/src/backend/utils/misc/guc.c
+++ b/src/backend/utils/misc/guc.c
@@ -668,6 +668,8 @@ bool gp_cte_sharing = false;
char *gp_idf_deduplicate_str;
+int share_input_scan_wait_lockfile_timeout;
+
/* gp_disable_catalog_access_on_segment */
bool gp_disable_catalog_access_on_segment = false;
@@ -6685,6 +6687,15 @@ static struct config_int ConfigureNamesInt[] =
&metadata_cache_max_hdfs_file_num,
524288, 32768, 8388608, NULL, NULL
},
+ {
+ {"share_input_scan_wait_lockfile_timeout", PGC_USERSET, DEVELOPER_OPTIONS,
+ gettext_noop("timeout (in millisecond) for waiting lock file which writer creates."),
+ NULL
+ },
+ &share_input_scan_wait_lockfile_timeout,
+ 300000, 1, 65536, NULL, NULL
+ },
+
/* End-of-list marker */
{
@@ -8349,7 +8360,6 @@ static struct config_string ConfigureNamesString[] =
NULL, NULL, NULL
},
-
/* End-of-list marker */
{
{NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeMaterial.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeMaterial.h b/src/include/executor/nodeMaterial.h
index 509f78c..41eb6f7 100644
--- a/src/include/executor/nodeMaterial.h
+++ b/src/include/executor/nodeMaterial.h
@@ -15,6 +15,7 @@
#define NODEMATERIAL_H
#include "nodes/execnodes.h"
+#include "executor/nodeShareInputScan.h"
extern int ExecCountSlotsMaterial(Material *node);
extern MaterialState *ExecInitMaterial(Material *node, EState *estate, int eflags);
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/executor/nodeShareInputScan.h
----------------------------------------------------------------------
diff --git a/src/include/executor/nodeShareInputScan.h b/src/include/executor/nodeShareInputScan.h
index 23025b9..730ffe1 100644
--- a/src/include/executor/nodeShareInputScan.h
+++ b/src/include/executor/nodeShareInputScan.h
@@ -30,6 +30,7 @@
#define NODESHAREINPUTSCAN_H
#include "nodes/execnodes.h"
+
extern int ExecCountSlotsShareInputScan(ShareInputScan* node);
extern ShareInputScanState *ExecInitShareInputScan(ShareInputScan *node, EState *estate, int eflags);
extern TupleTableSlot *ExecShareInputScan(ShareInputScanState *node);
@@ -52,4 +53,5 @@ static inline gpmon_packet_t * GpmonPktFromShareInputState(ShareInputScanState *
return &node->ss.ps.gpmon_pkt;
}
+extern void generate_lock_file_name(char* p, int size, int share_id, const char* name);
#endif /* NODESHAREINPUTSCAN_H */
http://git-wip-us.apache.org/repos/asf/incubator-hawq/blob/339806f3/src/include/utils/guc.h
----------------------------------------------------------------------
diff --git a/src/include/utils/guc.h b/src/include/utils/guc.h
index 546584f..530abe5 100644
--- a/src/include/utils/guc.h
+++ b/src/include/utils/guc.h
@@ -444,6 +444,8 @@ extern bool optimizer_prefer_scalar_dqa_multistage_agg;
extern bool optimizer_parallel_union;
extern bool optimizer_array_constraints;
+/* Timeout for shareinputscan writer/reader wait for lock files */
+extern int share_input_scan_wait_lockfile_timeout;
/* fallback in ranger ACL check */
extern int information_schema_namespace_oid;