You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hawq.apache.org by zt...@apache.org on 2021/12/04 09:10:52 UTC

[hawq] 02/02: HAWQ-1817. add exception handling for dispatcher

This is an automated email from the ASF dual-hosted git repository.

ztao1987 pushed a commit to branch ztao
in repository https://gitbox.apache.org/repos/asf/hawq.git

commit c3b1c41cf624cceb5a2c7dd3f8729511c298c9dc
Author: ztao1987 <zh...@gmail.com>
AuthorDate: Sat Dec 4 17:10:13 2021 +0800

    HAWQ-1817. add exception handling for dispatcher
---
 src/backend/cdb/cdbconn.c              | 20 +++++++++++++++++++-
 src/backend/cdb/cdbvars.c              |  1 +
 src/backend/cdb/dispatcher_mgr.c       |  4 ++--
 src/backend/cdb/dispatcher_new.c       | 10 ++++++++++
 src/backend/cdb/executormgr_new.c      | 32 +++++++++++++++++++++-----------
 src/backend/gp_libpq_fe/fe-connect.c   | 12 ++++++------
 src/backend/gp_libpq_fe/fe-protocol3.c |  1 +
 src/backend/gp_libpq_fe/gp-libpq-fe.h  |  2 +-
 src/backend/tcop/postgres.c            |  5 +++++
 src/include/cdb/cdbvars.h              |  2 ++
 src/include/cdb/dispatcher_new.h       |  1 +
 11 files changed, 69 insertions(+), 21 deletions(-)

diff --git a/src/backend/cdb/cdbconn.c b/src/backend/cdb/cdbconn.c
index 87aed62..dfa6178 100644
--- a/src/backend/cdb/cdbconn.c
+++ b/src/backend/cdb/cdbconn.c
@@ -445,8 +445,26 @@ cdbconn_main_doconnect(SegmentDatabaseDescriptor *segdbDesc,
    * Wait for it to respond giving us the TCP port number
    * where it listens for connections from the gang below.
    */
-  PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len);
+  if(!PQgetQEsDetail(segdbDesc->conn, connMsg->data, connMsg->len)){
+    if (!segdbDesc->errcode)
+      segdbDesc->errcode = ERRCODE_GP_INTERCONNECTION_ERROR;
 
+    write_log("Master unable to getQEsDetail from %s : %s\nConnection option: %s",
+              segdbDesc->whoami, PQerrorMessage(segdbDesc->conn),
+              connection_string);
+
+    appendPQExpBuffer(&segdbDesc->error_message,
+                      "Master unable to getQEsDetail from %s: %s\n",
+                      segdbDesc->whoami, PQerrorMessage(segdbDesc->conn));
+
+    /* Don't use elog, it's not thread-safe */
+    if (gp_log_gang >= GPVARS_VERBOSITY_DEBUG)
+      write_log("%s\n", segdbDesc->error_message.data);
+
+    PQfinish(segdbDesc->conn);
+    segdbDesc->conn = NULL;
+    return false;
+  }
 
   /*
    * Check for connection reset.
diff --git a/src/backend/cdb/cdbvars.c b/src/backend/cdb/cdbvars.c
index c5a770b..0225d34 100644
--- a/src/backend/cdb/cdbvars.c
+++ b/src/backend/cdb/cdbvars.c
@@ -54,6 +54,7 @@
 GpRoleValue Gp_role;			/* Role paid by this Greenplum Database backend */
 char	   *gp_role_string;	/* Staging area for guc.c */
 bool		gp_set_read_only;	/* Staging area for guc.c */
+bool        proxy_dispatcher_prepare_error = false;
 
 GpRoleValue Gp_session_role;	/* Role paid by this Greenplum Database backend */
 char	   *gp_session_role_string;	/* Staging area for guc.c */
diff --git a/src/backend/cdb/dispatcher_mgr.c b/src/backend/cdb/dispatcher_mgr.c
index b1e4feb..a8286f5 100644
--- a/src/backend/cdb/dispatcher_mgr.c
+++ b/src/backend/cdb/dispatcher_mgr.c
@@ -91,7 +91,7 @@ void mainDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp,
   foreach (lc, qeGrp->qes) {
     myQe = lfirst(lc);
 
-    if (workermgr_should_query_stop(state)) break;
+    if (workermgr_should_query_stop(state)) goto error;
 
     if (!executormgr_main_doconnect(myQe)) goto error;
   }
@@ -201,7 +201,7 @@ void proxyDispatchFuncConnect(struct MyQueryExecutorGroup *qeGrp,
   foreach (lc, qeGrp->qes) {
     myQe = lfirst(lc);
 
-    if (workermgr_should_query_stop(state)) break;
+    if (workermgr_should_query_stop(state)) goto error;
 
     if (!executormgr_proxy_doconnect(myQe)) {
       write_log("%s: failed to startup new qe.", __func__);
diff --git a/src/backend/cdb/dispatcher_new.c b/src/backend/cdb/dispatcher_new.c
index 09053f2..f00afce 100644
--- a/src/backend/cdb/dispatcher_new.c
+++ b/src/backend/cdb/dispatcher_new.c
@@ -1383,6 +1383,12 @@ void proxyDispatchPrepare(struct ProxyDispatchData *data) {
     workermgr_wait_job(state);
   }
 
+  if(proxyDispatchHasError(data)){
+    MemoryContextSwitchTo(old);
+    ereport(ERROR, (errcode(ERRCODE_GP_INTERCONNECTION_ERROR),
+                    errmsg("proxy dispatcher failed to connect to segment")));
+  }
+
   MemoryContextSwitchTo(old);
 }
 
@@ -1451,6 +1457,10 @@ void proxyDispatchCleanUp(struct ProxyDispatchData **data) {
   *data = NULL;
 }
 
+bool proxyDispatchHasError(struct ProxyDispatchData *data) {
+  return data->cdata.results && data->cdata.results->errcode;
+}
+
 void mainDispatchStmtNode(struct Node *node, struct QueryContextInfo *ctx,
                           struct QueryResource *resource,
                           struct DispatchDataResult *result) {
diff --git a/src/backend/cdb/executormgr_new.c b/src/backend/cdb/executormgr_new.c
index 910ca6f..8085783 100644
--- a/src/backend/cdb/executormgr_new.c
+++ b/src/backend/cdb/executormgr_new.c
@@ -316,15 +316,25 @@ bool executormgr_main_consumeData(struct MyQueryExecutor *qe) {
       done = true;
       break;
     }
-    int qeIndex;
-    cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer);
-    myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth(
-        list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex), qeIndex)));
-    struct CdbDispatchResult *refResult = myQe->refResult;
-    cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer);
-    conn->asyncStatus = PGASYNC_BUSY;
-    if (refResult->errcode != 0) {
-      cdbdisp_seterrcode(refResult->errcode, -1, refResult);
+    if (PQstatus(conn) == CONNECTION_BAD) goto error;
+
+    if (conn->dispBuffer.len != 0) {
+      int qeIndex;
+      cdbdisp_deserializeDispatchResult(NULL, &qeIndex, &conn->dispBuffer);
+      myQe = getTaskRefQE((struct MyDispatchTask *)(list_nth(
+          list_nth(getTaskPerSegmentList(qe->refTask), qe->execIndex),
+          qeIndex)));
+      struct CdbDispatchResult *refResult = myQe->refResult;
+      cdbdisp_deserializeDispatchResult(refResult, &qeIndex, &conn->dispBuffer);
+      conn->asyncStatus = PGASYNC_BUSY;
+      if (refResult->errcode != 0) {
+        cdbdisp_seterrcode(refResult->errcode, -1, refResult);
+        goto error;
+      }
+    } else {
+      PQgetResult(conn);
+      write_log("main dispatcher got error msg from proxy dispatcher: %s",
+                conn->errorMessage.data);
       goto error;
     }
   }
@@ -467,7 +477,7 @@ bool executormgr_main_cancel(struct MyQueryExecutor *qe) {
   char errbuf[256];
   MemSet(errbuf, 0, sizeof(errbuf));
   bool success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0);
-  if(!success){
+  if (!success) {
     write_log("executormgr_main_cancel cancel failed, %s.", errbuf);
   }
   PQfreeCancel(cn);
@@ -488,7 +498,7 @@ bool executormgr_proxy_cancel(struct MyQueryExecutor *qe, bool cancelRequest) {
     char errbuf[256];
     MemSet(errbuf, 0, sizeof(errbuf));
     success = (PQcancel(cn, errbuf, sizeof(errbuf)) != 0);
-    if(!success){
+    if (!success) {
       write_log("executormgr_proxy_cancel cancel failed, %s.", errbuf);
     }
     PQfreeCancel(cn);
diff --git a/src/backend/gp_libpq_fe/fe-connect.c b/src/backend/gp_libpq_fe/fe-connect.c
index 4a36b68..f75d7da 100644
--- a/src/backend/gp_libpq_fe/fe-connect.c
+++ b/src/backend/gp_libpq_fe/fe-connect.c
@@ -3501,9 +3501,9 @@ PQoptions(const PGconn *conn)
 	return conn->pgoptions;
 }
 
-int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
+bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
   if (!conn || (PQstatus(conn) == CONNECTION_BAD))
-      return -1;
+      return false;
   pqPacketSend(conn, 'V', connMsg, connMsgLen+1);
 
   resetPQExpBuffer(&conn->dispBuffer);
@@ -3512,21 +3512,21 @@ int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen) {
   if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
     pqParseInput3(conn);
   else
-    return -1;
+    return false;
 
   while (!conn->dispBuffer.len)
   {
     pqWait(TRUE, FALSE, conn);
     if (pqReadData(conn) < 0)
-      return -1;
+      return false;
 
     if (PG_PROTOCOL_MAJOR(conn->pversion) >= 3)
       pqParseInput3(conn);
     else
-      return -1;
+      return false;
   }
 
-  return 0;
+  return true;
 }
 
 /* GPDB function to retrieve QE-backend details (motion listener) */
diff --git a/src/backend/gp_libpq_fe/fe-protocol3.c b/src/backend/gp_libpq_fe/fe-protocol3.c
index 89b055f..1254f90 100644
--- a/src/backend/gp_libpq_fe/fe-protocol3.c
+++ b/src/backend/gp_libpq_fe/fe-protocol3.c
@@ -293,6 +293,7 @@ pqParseInput3(PGconn *conn)
 				}
 				break;
 				case 'E':		/* error return */
+				    conn->dispBuffer.len = 0;
 					if (pqGetErrorNotice3(conn, true))
 						return;
 					conn->asyncStatus = PGASYNC_READY;
diff --git a/src/backend/gp_libpq_fe/gp-libpq-fe.h b/src/backend/gp_libpq_fe/gp-libpq-fe.h
index 2184033..be1cc83 100644
--- a/src/backend/gp_libpq_fe/gp-libpq-fe.h
+++ b/src/backend/gp_libpq_fe/gp-libpq-fe.h
@@ -320,7 +320,7 @@ extern char *PQport(const PGconn *conn);
 extern char *PQtty(const PGconn *conn);
 extern char *PQoptions(const PGconn *conn);
 extern int	PQgetQEdetail(PGconn *conn, bool alwaysFetch); /* GPDB -- retrieve QE-backend details. */
-extern int PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen);
+extern bool PQgetQEsDetail(PGconn *conn, char *connMsg, int connMsgLen);
 extern ConnStatusType PQstatus(const PGconn *conn);
 extern PGTransactionStatusType PQtransactionStatus(const PGconn *conn);
 extern const char *PQparameterStatus(const PGconn *conn,
diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c
index 085ab08..ba6c17a 100644
--- a/src/backend/tcop/postgres.c
+++ b/src/backend/tcop/postgres.c
@@ -4784,6 +4784,10 @@ PostgresMain(int argc, char *argv[], const char *username)
 
 		/* Now we can allow interrupts again */
 		RESUME_INTERRUPTS();
+
+		if(proxy_dispatcher_prepare_error){
+		  exit(0);
+		}
 	}
 
 	/* We can now handle ereport(ERROR) */
@@ -5512,6 +5516,7 @@ PostgresMain(int argc, char *argv[], const char *username)
 			  }
 			  PG_CATCH();
 			  {
+			    proxy_dispatcher_prepare_error = true;
 			    proxyDispatchCleanUp(&dispatchData);
 			    PG_RE_THROW();
 			  }
diff --git a/src/include/cdb/cdbvars.h b/src/include/cdb/cdbvars.h
index fb7f30d..5b58562 100644
--- a/src/include/cdb/cdbvars.h
+++ b/src/include/cdb/cdbvars.h
@@ -144,6 +144,8 @@ extern char *gp_role_string;	/* Use by guc.c as staging area for value. */
 extern const char *assign_gp_role(const char *newval, bool doit, GucSource source);
 extern const char *show_gp_role(void);
 
+extern bool proxy_dispatcher_prepare_error;
+
 extern bool gp_reraise_signal; /* try to force a core dump ?*/
 
 extern bool gp_version_mismatch_error;	/* Enforce same-version on QD&QE. */
diff --git a/src/include/cdb/dispatcher_new.h b/src/include/cdb/dispatcher_new.h
index ea84071..c0aeb9f 100644
--- a/src/include/cdb/dispatcher_new.h
+++ b/src/include/cdb/dispatcher_new.h
@@ -59,6 +59,7 @@ extern void sendSegQEDetails(struct ProxyDispatchData *data);
 extern void proxyDispatchRun(struct ProxyDispatchData *data, char *connMsg);
 extern void proxyDispatchWait(struct ProxyDispatchData *data);
 extern void proxyDispatchCleanUp(struct ProxyDispatchData **data);
+extern bool proxyDispatchHasError(struct ProxyDispatchData *data);
 
 // dispatch statement
 extern void mainDispatchStmtNode(struct Node *node,