You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/16 15:38:34 UTC

[incubator-doris] branch master updated: [Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new dfa4133   [Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)
dfa4133 is described below

commit dfa413335f54b72839f1fc47bb1100d3b8f85bf7
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Wed Dec 16 23:38:04 2020 +0800

     [Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)
    
    This PR is to support fe master get fe heartbeat response by thrift protocol instead of http protocol.
---
 .../java/org/apache/doris/common/ClientPool.java   |  6 +-
 .../main/java/org/apache/doris/common/Config.java  |  9 +++
 .../apache/doris/service/FrontendServiceImpl.java  | 38 ++++++++++
 .../java/org/apache/doris/system/HeartbeatMgr.java | 58 +++++++++++----
 .../org/apache/doris/system/HeartbeatMgrTest.java  | 83 +++++++++++++++++++++-
 gensrc/thrift/FrontendService.thrift               | 23 +++++-
 6 files changed, 197 insertions(+), 20 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 0f0ac40..62b3d12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -62,8 +62,10 @@ public class ClientPool {
         brokerPoolConfig.setMaxWaitMillis(500);    //  wait for the connection
     }
 
-    public static GenericPool<HeartbeatService.Client> heartbeatPool =
-            new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs); 
+    public static GenericPool<HeartbeatService.Client> backendHeartbeatPool =
+            new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
+    public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
+            new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs);
     public static GenericPool<FrontendService.Client> frontendPool =
             new GenericPool("FrontendService", backendConfig, backendTimeoutMs);
     public static GenericPool<BackendService.Client> backendPool =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 6560995..83e5237 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1301,4 +1301,13 @@ public class Config extends ConfigBase {
      */
     @ConfField
     public static boolean enable_alpha_rowset = false;
+
+    /**
+     * This config is used to solve fe heartbeat response read_timeout problem,
+     * When config is set to be true, master will get fe heartbeat response by thrift protocol
+     * instead of http protocol. In order to maintain compatibility with the old version,
+     * the default is false, and the configuration cannot be changed to true until all fe are upgraded.
+     */
+    @ConfField(mutable = true, masterOnly = true)
+    public static boolean enable_fe_heartbeat_by_thrift = false;
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ca93063..7cd1c64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.PatternMatcher;
 import org.apache.doris.common.ThriftServerContext;
 import org.apache.doris.common.ThriftServerEventProcessor;
 import org.apache.doris.common.UserException;
+import org.apache.doris.common.Version;
 import org.apache.doris.load.EtlStatus;
 import org.apache.doris.load.LoadJob;
 import org.apache.doris.load.MiniEtlTaskInfo;
@@ -64,6 +65,9 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
 import org.apache.doris.thrift.TFeResult;
 import org.apache.doris.thrift.TFetchResourceResult;
 import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
 import org.apache.doris.thrift.TGetDbsParams;
 import org.apache.doris.thrift.TGetDbsResult;
 import org.apache.doris.thrift.TGetTablesParams;
@@ -959,6 +963,40 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         return new TStatus(TStatusCode.CANCELLED);
     }
 
+    @Override
+    public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException {
+        boolean isReady = Catalog.getCurrentCatalog().isReady();
+        TFrontendPingFrontendResult result = new TFrontendPingFrontendResult();
+        result.setStatus(TFrontendPingFrontendStatusCode.OK);
+        if (isReady) {
+            if (request.getClusterId() != Catalog.getCurrentCatalog().getClusterId()) {
+                result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+                result.setMsg("invalid cluster id: " + Catalog.getCurrentCatalog().getClusterId());
+            }
+
+            if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
+                if (!request.getToken().equals(Catalog.getCurrentCatalog().getToken())) {
+                    result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+                    result.setMsg("invalid token: " + Catalog.getCurrentCatalog().getToken());
+                }
+            }
+
+            if (result.status == TFrontendPingFrontendStatusCode.OK) {
+                // cluster id and token are valid, return replayed journal id
+                long replayedJournalId = Catalog.getCurrentCatalog().getReplayedJournalId();
+                result.setMsg("success");
+                result.setReplayedJournalId(replayedJournalId);
+                result.setQueryPort(Config.query_port);
+                result.setRpcPort(Config.rpc_port);
+                result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
+            }
+        } else {
+            result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+            result.setMsg("not ready");
+        }
+        return result;
+    }
+
     private TNetworkAddress getClientAddr() {
         ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
         // For NonBlockingServer, we can not get client ip.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index cd5da57..3b197b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -30,12 +30,16 @@ import org.apache.doris.http.rest.BootstrapFinishAction;
 import org.apache.doris.persist.HbPackage;
 import org.apache.doris.service.FrontendOptions;
 import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.HeartbeatService;
 import org.apache.doris.thrift.TBackendInfo;
 import org.apache.doris.thrift.TBrokerOperationStatus;
 import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TBrokerPingBrokerRequest;
 import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
 import org.apache.doris.thrift.THeartbeatResult;
 import org.apache.doris.thrift.TMasterInfo;
 import org.apache.doris.thrift.TNetworkAddress;
@@ -106,11 +110,7 @@ public class HeartbeatMgr extends MasterDaemon {
 
         // send frontend heartbeat
         List<Frontend> frontends = Catalog.getCurrentCatalog().getFrontends(null);
-        String masterFeNodeName = "";
         for (Frontend frontend : frontends) {
-            if (frontend.getHost().equals(masterInfo.get().getNetworkAddress().getHostname())) {
-                masterFeNodeName = frontend.getNodeName();
-            }
             FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(frontend,
                     Catalog.getCurrentCatalog().getClusterId(),
                     Catalog.getCurrentCatalog().getToken());
@@ -151,12 +151,6 @@ public class HeartbeatMgr extends MasterDaemon {
             }
         } // end for all results
 
-        // we also add a 'mocked' master Frontends heartbeat response to synchronize master info to other Frontends.
-        hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName,
-                Config.query_port, Config.rpc_port, Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(),
-                System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH));
-
-        // write edit log
         Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage);
     }
 
@@ -221,7 +215,7 @@ public class HeartbeatMgr extends MasterDaemon {
             TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort());
             boolean ok = false;
             try {
-                client = ClientPool.heartbeatPool.borrowObject(beAddr);
+                client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
 
                 TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
                 copiedMasterInfo.setBackendIp(backend.getHost());
@@ -256,9 +250,9 @@ public class HeartbeatMgr extends MasterDaemon {
                         Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
             } finally {
                 if (ok) {
-                    ClientPool.heartbeatPool.returnObject(beAddr, client);
+                    ClientPool.backendHeartbeatPool.returnObject(beAddr, client);
                 } else {
-                    ClientPool.heartbeatPool.invalidateObject(beAddr, client);
+                    ClientPool.backendHeartbeatPool.invalidateObject(beAddr, client);
                 }
             }
         }
@@ -282,13 +276,20 @@ public class HeartbeatMgr extends MasterDaemon {
                 // heartbeat to self
                 if (Catalog.getCurrentCatalog().isReady()) {
                     return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
-                            Catalog.getCurrentCatalog().getReplayedJournalId(), System.currentTimeMillis(),
+                            Catalog.getCurrentCatalog().getMaxJournalId(), System.currentTimeMillis(),
                             Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
                 } else {
                     return new FrontendHbResponse(fe.getNodeName(), "not ready");
                 }
             }
+            if (Config.enable_fe_heartbeat_by_thrift) {
+                return getHeartbeatResponseByThrift();
+            } else {
+                return getHeartbeatResponseByHttp();
+            }
+        }
 
+        private HeartbeatResponse getHeartbeatResponseByHttp() {
             String url = "http://" + fe.getHost() + ":" + Config.http_port
                     + "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token;
             try {
@@ -338,6 +339,35 @@ public class HeartbeatMgr extends MasterDaemon {
                         Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
             }
         }
+
+        private HeartbeatResponse getHeartbeatResponseByThrift() {
+            FrontendService.Client client = null;
+            TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
+            boolean ok = false;
+            try {
+                client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
+                TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token);
+                TFrontendPingFrontendResult result = client.ping(request);
+                ok = true;
+                if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
+                    return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(),
+                            result.getRpcPort(), result.getReplayedJournalId(),
+                            System.currentTimeMillis(), result.getVersion());
+
+                } else {
+                    return new FrontendHbResponse(fe.getNodeName(), result.getMsg());
+                }
+            } catch (Exception e) {
+                return new FrontendHbResponse(fe.getNodeName(),
+                        Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
+            } finally {
+                if (ok) {
+                    ClientPool.frontendHeartbeatPool.returnObject(addr, client);
+                } else {
+                    ClientPool.frontendHeartbeatPool.invalidateObject(addr, client);
+                }
+            }
+        }
     }
 
     // broker heartbeat handler
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
index bb2aaa0..d1089d0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.system;
 import mockit.Expectations;
 import org.apache.doris.catalog.Catalog;
 import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.GenericPool;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.util.Util;
@@ -27,12 +28,17 @@ import org.apache.doris.ha.FrontendNodeType;
 import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler;
 import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler;
 import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.FrontendService;
 import org.apache.doris.thrift.TBrokerOperationStatus;
 import org.apache.doris.thrift.TBrokerOperationStatusCode;
 import org.apache.doris.thrift.TBrokerPingBrokerRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
 import org.apache.doris.thrift.TNetworkAddress;
 import org.apache.doris.thrift.TPaloBrokerService;
 
+import org.apache.thrift.TException;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -67,7 +73,7 @@ public class HeartbeatMgrTest {
     }
 
     @Test
-    public void testFrontendHbHandler() {
+    public void testFrontendHbHandlerWithHttp() {
         new MockUp<Util>() {
             @Mock
             public String getResultForUrl(String urlStr, String encodedAuthInfo,
@@ -90,11 +96,11 @@ public class HeartbeatMgrTest {
                 }
             }
         };
-
+        Config.enable_fe_heartbeat_by_thrift = false;
+        System.out.println(" config " + Config.enable_fe_heartbeat_by_thrift);
         Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
         FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
         HeartbeatResponse response = handler.call();
-
         Assert.assertTrue(response instanceof FrontendHbResponse);
         FrontendHbResponse hbResponse = (FrontendHbResponse) response;
         Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
@@ -113,7 +119,78 @@ public class HeartbeatMgrTest {
         Assert.assertEquals(0, hbResponse.getQueryPort());
         Assert.assertEquals(0, hbResponse.getRpcPort());
         Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
+    }
 
+    @Test
+    public void testFrontendHbHandlerWithThirft(@Mocked FrontendService.Client client) throws TException {
+        new MockUp<GenericPool<FrontendService.Client>>() {
+            @Mock
+            public FrontendService.Client borrowObject(TNetworkAddress address) throws Exception {
+                return client;
+            }
+
+            @Mock
+            public void returnObject(TNetworkAddress address, FrontendService.Client object) {
+                return;
+            }
+
+            @Mock
+            public void invalidateObject(TNetworkAddress address, FrontendService.Client object) {
+                return;
+            }
+        };
+
+        TFrontendPingFrontendRequest normalRequest = new TFrontendPingFrontendRequest(12345, "abcd");
+        TFrontendPingFrontendResult normalResult = new TFrontendPingFrontendResult();
+        normalResult.setStatus(TFrontendPingFrontendStatusCode.OK);
+        normalResult.setMsg("success");
+        normalResult.setReplayedJournalId(191224);
+        normalResult.setQueryPort(9131);
+        normalResult.setRpcPort(9121);
+        normalResult.setVersion("test");
+
+        TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde");
+        TFrontendPingFrontendResult badResult = new TFrontendPingFrontendResult();
+        badResult.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+        badResult.setMsg("not ready");
+
+        new Expectations() {
+            {
+                client.ping(normalRequest);
+                minTimes = 0;
+                result = normalResult;
+
+                client.ping(badRequest);
+                minTimes = 0;
+                result = badResult;
+            }
+        };
+
+        Config.enable_fe_heartbeat_by_thrift = true;
+
+        Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
+        FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
+        HeartbeatResponse response = handler.call();
+
+        Assert.assertTrue(response instanceof FrontendHbResponse);
+        FrontendHbResponse hbResponse = (FrontendHbResponse) response;
+        Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
+        Assert.assertEquals(9131, hbResponse.getQueryPort());
+        Assert.assertEquals(9121, hbResponse.getRpcPort());
+        Assert.assertEquals(HbStatus.OK, hbResponse.getStatus());
+        Assert.assertEquals("test", hbResponse.getVersion());
+
+        Frontend fe2 = new Frontend(FrontendNodeType.FOLLOWER, "test2", "192.168.1.2", 9010);
+        handler = new FrontendHeartbeatHandler(fe2, 12345, "abcde");
+        response = handler.call();
+
+        Assert.assertTrue(response instanceof FrontendHbResponse);
+        hbResponse = (FrontendHbResponse) response;
+        Assert.assertEquals(0, hbResponse.getReplayedJournalId());
+        Assert.assertEquals(0, hbResponse.getQueryPort());
+        Assert.assertEquals(0, hbResponse.getRpcPort());
+        Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
+        Assert.assertEquals("not ready", hbResponse.getMsg());
     }
 
     @Test
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 445d027..35182f5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -657,6 +657,25 @@ struct TSnapshotLoaderReportRequest {
     5: optional i32 total_num
 }
 
+enum TFrontendPingFrontendStatusCode {
+   OK = 0,
+   FAILED = 1
+}
+
+struct TFrontendPingFrontendRequest {
+   1: required i32 clusterId
+   2: required string token
+}
+
+struct TFrontendPingFrontendResult {
+    1: required TFrontendPingFrontendStatusCode status
+    2: required string msg
+    3: required i32 queryPort
+    4: required i32 rpcPort
+    5: required i64 replayedJournalId
+    6: required string version
+}
+
 service FrontendService {
     TGetDbsResult getDbNames(1:TGetDbsParams params)
     TGetTablesResult getTableNames(1:TGetTablesParams params)
@@ -692,4 +711,6 @@ service FrontendService {
     TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
 
     Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
-}
+
+    TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org