You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2021/12/04 09:10:52 UTC
[hawq] 02/02: HAWQ-1817. add exception handling for dispatcher
This is an automated email from the ASF dual-hosted git repository.
ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git
commit c3b1c41cf624cceb5a2c7dd3f8729511c298c9dc
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Sat Dec 4 17:10:13 2021 +0800
HAWQ-1817. add exception handling for dispatcher
---
src/backend/cdb/cdbconn.c | 20 +++++++++++++++++++-
src/backend/cdb/cdbvars.c | 1 +
src/backend/cdb/dispatcher_mgr.c | 4 ++--
src/backend/cdb/dispatcher_new.c | 10 ++++++++++
src/backend/cdb/executormgr_new.c | 32 +++++++++++++++++++++-----------
src/backend/gp_libpq_fe/fe-connect.c | 12 ++++++------
src/backend/gp_libpq_fe/fe-protocol3.c | 1 +
src/backend/gp_libpq_fe/gp-libpq-fe.h | 2 +-
src/backend/tcop/postgres.c | 5 +++++
src/include/cdb/cdbvars.h | 2 ++
src/include/cdb/dispatcher_new.h | 1 +
11 files changed, 69 insertions(+), 21 deletions(-)
diff --git a/src/backend/cdb/cdbconn.c b/src/backend/cdb/cdbconn.c
index 87aed62..dfa6178 100644
--- a/src/backend/cdb/cdbconn.c
+++ b/src/backend/cdb/cdbconn.c
@@ -445,8 +445,26 @@ cdbconn_main_doconnect(SegmentDatabaseDescriptor *segdbDesc,
* Wait for it to respond giving us the TCP port number
* where it listens for connections from the gang below.
*/
- PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len);
+ if(!PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len)){
+ if (!segdbDesc->errcode)
+ segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
+ write_log("Master unable to getQEsDetail from %s : %s\nConnection option: %s",
+ segdbDesc->whoami, PQerrorMessage(segdbDesc->conn),
+ connection_string);
+
+ appendPQExpBuffer(&segdbDesc->error_message,
+ "Master unable to getQEsDetail from %s: %s\n",
+ segdbDesc->whoami, PQerrorMessage(segdbDesc->conn));
+
+ /* Don't use elog, it's not thread-safe */
+ if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
+ write_log("%s\n", segdbDesc->error_message.data);
+
+ PQfinish(segdbDesc->conn);
+ segdbDesc->conn = NULL;
+ return false;
+ }
/*
* Check for connection reset.
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index c5a770b..0225d34 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -54,6 +54,7 @@
GpRoleValue Gp_role; /* Role paid by this Greenplum Database backend */
char *gp_role_string; /* Staging area for guc.c */
bool gp_set_read_only; /* Staging area for guc.c */
+bool proxy_dispatcher_prepare_error = false;
GpRoleValue Gp_session_role; /* Role paid by this Greenplum Database backend */
char *gp_session_role_string; /* Staging area for guc.c */
diff --git a/src/backend/cdb/dispatcher_mgr.c b/src/backend/cdb/dispatcher_mgr.c
index b1e4feb..a8286f5 100644
--- a/src/backend/cdb/dispatcher_mgr.c
+++ b/src/backend/cdb/dispatcher_mgr.c
@@ -91,7 +91,7 @@ void mainDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp,
foreach (lc, qeGrp->qes) {
myQe = lfirst(lc);
- if (workermgr_should_query_stop(state)) break;
+ if (workermgr_should_query_stop(state)) goto error;
if (!executormgr_main_doconnect(myQe)) goto error;
}
@@ -201,7 +201,7 @@ void proxyDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp,
foreach (lc, qeGrp->qes) {
myQe = lfirst(lc);
- if (workermgr_should_query_stop(state)) break;
+ if (workermgr_should_query_stop(state)) goto error;
if (!executormgr_proxy_doconnect(myQe)) {
write_log("%s: failed to startup new qe.", __func__);
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index 09053f2..f00afce 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -1383,6 +1383,12 @@ void proxyDispatchPrepare(struct ProxyDispatchData *data) {
workermgr_wait_job(state);
}
+ if(proxyDispatchHasError(data)){
+ MemoryContextSwitchTo(old);
+ ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+ errmsg("proxy dispatcher failed to connect to segment")));
+ }
+
MemoryContextSwitchTo(old);
}
@@ -1451,6 +1457,10 @@ void proxyDispatchCleanUp(struct ProxyDispatchData **data) {
*data = NULL;
}
+bool proxyDispatchHasError(struct ProxyDispatchData *data) {
+ return data->cdata.results && data->cdata.results->errcode;
+}
+
void mainDispatchStmtNode(struct Node *node, struct QueryContextInfo *ctx,
struct QueryResource *resource,
struct DispatchDataResult *result) {
diff --git a/src/backend/cdb/executormgr_new.c b/src/backend/cdb/executormgr_new.c
index 910ca6f..8085783 100644
--- a/src/backend/cdb/executormgr_new.c
+++ b/src/backend/cdb/executormgr_new.c
@@ -316,15 +316,25 @@ bool executormgr_main_consumeData(struct MyQueryExecutor *qe) {
done = true;
break;
}
- int qeIndex;
- cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer);
- myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth(
- list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex), qeIndex)));
- struct CdbDispatchResult *refResult = myQe->refResult;
- cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer);
- conn->asyncStatus = PGASYNC_BUSY;
- if (refResult->errcode != 0) {
- cdbdisp_seterrcode(refResult->errcode, -1, refResult);
+ if (PQstatus(conn) == CONNECTION_BAD) goto error;
+
+ if (conn->dispBuffer.len != 0) {
+ int qeIndex;
+ cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer);
+ myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth(
+ list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex),
+ qeIndex)));
+ struct CdbDispatchResult *refResult = myQe->refResult;
+ cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer);
+ conn->asyncStatus = PGASYNC_BUSY;
+ if (refResult->errcode != 0) {
+ cdbdisp_seterrcode(refResult->errcode, -1, refResult);
+ goto error;
+ }
+ } else {
+ PQgetResult(conn);
+ write_log("main dispatcher got error msg from proxy dispatcher: %s",
+ conn->errorMessage.data);
goto error;
}
}
@@ -467,7 +477,7 @@ bool executormgr_main_cancel(struct MyQueryExecutor *qe) {
char errbuf[256];
MemSet(errbuf, 0, sizeof(errbuf));
bool success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0);
- if(!success){
+ if (!success) {
write_log("executormgr_main_cancel cancel failed, %s.", errbuf);
}
PQfreeCancel(cn);
@@ -488,7 +498,7 @@ bool executormgr_proxy_cancel(struct MyQueryExecutor *qe, bool cancelRequest) {
char errbuf[256];
MemSet(errbuf, 0, sizeof(errbuf));
success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0);
- if(!success){
+ if (!success) {
write_log("executormgr_proxy_cancel cancel failed, %s.", errbuf);
}
PQfreeCancel(cn);
diff --git a/src/backend/gp_libpq_fe/fe-connect.c b/src/backend/gp_libpq_fe/fe-connect.c
index 4a36b68..f75d7da 100644
--- a/src/backend/gp_libpq_fe/fe-connect.c
+++ b/src/backend/gp_libpq_fe/fe-connect.c
@@ -3501,9 +3501,9 @@ PQoptions(const PGconn *conn)
return conn->pgoptions;
}
-int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
+bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
if (!conn || (PQstatus(conn) == CONNECTION_BAD))
- return -1;
+ return false;
pqPacketSend(conn, 'V', connMsg, connMsgLen+1);
resetPQExpBuffer(&conn->dispBuffer);
@@ -3512,21 +3512,21 @@ int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
pqParseInput3(conn);
else
- return -1;
+ return false;
while (!conn->dispBuffer.len)
{
pqWait(TRUE, FALSE, conn);
if (pqReadData(conn) < 0)
- return -1;
+ return false;
if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
pqParseInput3(conn);
else
- return -1;
+ return false;
}
- return 0;
+ return true;
}
/* GPDB function to retrieve QE-backend details (motion listener) */
diff --git a/src/backend/gp_libpq_fe/fe-protocol3.c b/src/backend/gp_libpq_fe/fe-protocol3.c
index 89b055f..1254f90 100644
--- a/src/backend/gp_libpq_fe/fe-protocol3.c
+++ b/src/backend/gp_libpq_fe/fe-protocol3.c
@@ -293,6 +293,7 @@ pqParseInput3(PGconn *conn)
}
break;
case 'E': /* error return */
+ conn->dispBuffer.len = 0;
if (pqGetErrorNotice3(conn, true))
return;
conn->asyncStatus = PGASYNC_READY;
diff --git a/src/backend/gp_libpq_fe/gp-libpq-fe.h b/src/backend/gp_libpq_fe/gp-libpq-fe.h
index 2184033..be1cc83 100644
--- a/src/backend/gp_libpq_fe/gp-libpq-fe.h
+++ b/src/backend/gp_libpq_fe/gp-libpq-fe.h
@@ -320,7 +320,7 @@ extern char *PQport(const PGconn *conn);
extern char *PQtty(const PGconn *conn);
extern char *PQoptions(const PGconn *conn);
extern int PQgetQEdetail(PGconn *conn, bool alwaysFetch); /* GPDB -- retrieve QE-backend details. */
-extern int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen);
+extern bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen);
extern ConnStatusType PQstatus(const PGconn *conn);
extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn);
extern const char *PQparameterStatus(const PGconn *conn,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 085ab08..ba6c17a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4784,6 +4784,10 @@ PostgresMain(int argc, char *argv[], const char *username)
/* Now we can allow interrupts again */
RESUME_INTERRUPTS();
+
+ if(proxy_dispatcher_prepare_error){
+ exit(0);
+ }
}
/* We can now handle ereport(ERROR) */
@@ -5512,6 +5516,7 @@ PostgresMain(int argc, char *argv[], const char *username)
}
PG_CATCH();
{
+ proxy_dispatcher_prepare_error = true;
proxyDispatchCleanUp(&dispatchData);
PG_RE_THROW();
}
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index fb7f30d..5b58562 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -144,6 +144,8 @@ extern char *gp_role_string; /* Use by guc.c as staging area for value. */
extern const char *assign_gp_role(const char *newval, bool doit, GucSource source);
extern const char *show_gp_role(void);
+extern bool proxy_dispatcher_prepare_error;
+
extern bool gp_reraise_signal; /* try to force a core dump ?*/
extern bool gp_version_mismatch_error; /* Enforce same-version on QD&QE. */
diff --git a/src/include/cdb/dispatcher_new.h b/src/include/cdb/dispatcher_new.h
index ea84071..c0aeb9f 100644
--- a/src/include/cdb/dispatcher_new.h
+++ b/src/include/cdb/dispatcher_new.h
@@ -59,6 +59,7 @@ extern void sendSegQEDetails(struct ProxyDispatchData *data);
extern void proxyDispatchRun(struct ProxyDispatchData *data, char *connMsg);
extern void proxyDispatchWait(struct ProxyDispatchData *data);
extern void proxyDispatchCleanUp(struct ProxyDispatchData **data);
+extern bool proxyDispatchHasError(struct ProxyDispatchData *data);
// dispatch statement
extern void mainDispatchStmtNode(struct Node *node,