You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2020/12/16 15:38:34 UTC
[incubator-doris] branch master updated: [Heartbeat] Support fe
heartbeat use thrift protocol to get stable response (#5027)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new dfa4133 [Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)
dfa4133 is described below
commit dfa413335f54b72839f1fc47bb1100d3b8f85bf7
Author: caiconghui <55...@users.noreply.github.com>
AuthorDate: Wed Dec 16 23:38:04 2020 +0800
[Heartbeat] Support fe heartbeat use thrift protocol to get stable response (#5027)
This PR is to support fe master get fe heartbeat response by thrift protocol instead of http protocol.
---
.../java/org/apache/doris/common/ClientPool.java | 6 +-
.../main/java/org/apache/doris/common/Config.java | 9 +++
.../apache/doris/service/FrontendServiceImpl.java | 38 ++++++++++
.../java/org/apache/doris/system/HeartbeatMgr.java | 58 +++++++++++----
.../org/apache/doris/system/HeartbeatMgrTest.java | 83 +++++++++++++++++++++-
gensrc/thrift/FrontendService.thrift | 23 +++++-
6 files changed, 197 insertions(+), 20 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
index 0f0ac40..62b3d12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/ClientPool.java
@@ -62,8 +62,10 @@ public class ClientPool {
brokerPoolConfig.setMaxWaitMillis(500); // wait for the connection
}
- public static GenericPool<HeartbeatService.Client> heartbeatPool =
- new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
+ public static GenericPool<HeartbeatService.Client> backendHeartbeatPool =
+ new GenericPool("HeartbeatService", heartbeatConfig, heartbeatTimeoutMs);
+ public static GenericPool<FrontendService.Client> frontendHeartbeatPool =
+ new GenericPool<>("FrontendService", heartbeatConfig, heartbeatTimeoutMs);
public static GenericPool<FrontendService.Client> frontendPool =
new GenericPool("FrontendService", backendConfig, backendTimeoutMs);
public static GenericPool<BackendService.Client> backendPool =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
index 6560995..83e5237 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/Config.java
@@ -1301,4 +1301,13 @@ public class Config extends ConfigBase {
*/
@ConfField
public static boolean enable_alpha_rowset = false;
+
+ /**
+ * This config is used to solve fe heartbeat response read_timeout problem,
+ * When config is set to be true, master will get fe heartbeat response by thrift protocol
+ * instead of http protocol. In order to maintain compatibility with the old version,
+ * the default is false, and the configuration cannot be changed to true until all fe are upgraded.
+ */
+ @ConfField(mutable = true, masterOnly = true)
+ public static boolean enable_fe_heartbeat_by_thrift = false;
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index ca93063..7cd1c64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -37,6 +37,7 @@ import org.apache.doris.common.PatternMatcher;
import org.apache.doris.common.ThriftServerContext;
import org.apache.doris.common.ThriftServerEventProcessor;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.Version;
import org.apache.doris.load.EtlStatus;
import org.apache.doris.load.LoadJob;
import org.apache.doris.load.MiniEtlTaskInfo;
@@ -64,6 +65,9 @@ import org.apache.doris.thrift.TExecPlanFragmentParams;
import org.apache.doris.thrift.TFeResult;
import org.apache.doris.thrift.TFetchResourceResult;
import org.apache.doris.thrift.TFinishTaskRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TGetDbsParams;
import org.apache.doris.thrift.TGetDbsResult;
import org.apache.doris.thrift.TGetTablesParams;
@@ -959,6 +963,40 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return new TStatus(TStatusCode.CANCELLED);
}
+ @Override
+ public TFrontendPingFrontendResult ping(TFrontendPingFrontendRequest request) throws TException {
+ boolean isReady = Catalog.getCurrentCatalog().isReady();
+ TFrontendPingFrontendResult result = new TFrontendPingFrontendResult();
+ result.setStatus(TFrontendPingFrontendStatusCode.OK);
+ if (isReady) {
+ if (request.getClusterId() != Catalog.getCurrentCatalog().getClusterId()) {
+ result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+ result.setMsg("invalid cluster id: " + Catalog.getCurrentCatalog().getClusterId());
+ }
+
+ if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
+ if (!request.getToken().equals(Catalog.getCurrentCatalog().getToken())) {
+ result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+ result.setMsg("invalid token: " + Catalog.getCurrentCatalog().getToken());
+ }
+ }
+
+ if (result.status == TFrontendPingFrontendStatusCode.OK) {
+ // cluster id and token are valid, return replayed journal id
+ long replayedJournalId = Catalog.getCurrentCatalog().getReplayedJournalId();
+ result.setMsg("success");
+ result.setReplayedJournalId(replayedJournalId);
+ result.setQueryPort(Config.query_port);
+ result.setRpcPort(Config.rpc_port);
+ result.setVersion(Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
+ }
+ } else {
+ result.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+ result.setMsg("not ready");
+ }
+ return result;
+ }
+
private TNetworkAddress getClientAddr() {
ThriftServerContext connectionContext = ThriftServerEventProcessor.getConnectionContext();
// For NonBlockingServer, we can not get client ip.
diff --git a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
index cd5da57..3b197b8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/HeartbeatMgr.java
@@ -30,12 +30,16 @@ import org.apache.doris.http.rest.BootstrapFinishAction;
import org.apache.doris.persist.HbPackage;
import org.apache.doris.service.FrontendOptions;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.HeartbeatService;
import org.apache.doris.thrift.TBackendInfo;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
import org.apache.doris.thrift.TBrokerVersion;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.THeartbeatResult;
import org.apache.doris.thrift.TMasterInfo;
import org.apache.doris.thrift.TNetworkAddress;
@@ -106,11 +110,7 @@ public class HeartbeatMgr extends MasterDaemon {
// send frontend heartbeat
List<Frontend> frontends = Catalog.getCurrentCatalog().getFrontends(null);
- String masterFeNodeName = "";
for (Frontend frontend : frontends) {
- if (frontend.getHost().equals(masterInfo.get().getNetworkAddress().getHostname())) {
- masterFeNodeName = frontend.getNodeName();
- }
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(frontend,
Catalog.getCurrentCatalog().getClusterId(),
Catalog.getCurrentCatalog().getToken());
@@ -151,12 +151,6 @@ public class HeartbeatMgr extends MasterDaemon {
}
} // end for all results
- // we also add a 'mocked' master Frontends heartbeat response to synchronize master info to other Frontends.
- hbPackage.addHbResponse(new FrontendHbResponse(masterFeNodeName,
- Config.query_port, Config.rpc_port, Catalog.getCurrentCatalog().getEditLog().getMaxJournalId(),
- System.currentTimeMillis(), Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH));
-
- // write edit log
Catalog.getCurrentCatalog().getEditLog().logHeartbeat(hbPackage);
}
@@ -221,7 +215,7 @@ public class HeartbeatMgr extends MasterDaemon {
TNetworkAddress beAddr = new TNetworkAddress(backend.getHost(), backend.getHeartbeatPort());
boolean ok = false;
try {
- client = ClientPool.heartbeatPool.borrowObject(beAddr);
+ client = ClientPool.backendHeartbeatPool.borrowObject(beAddr);
TMasterInfo copiedMasterInfo = new TMasterInfo(masterInfo.get());
copiedMasterInfo.setBackendIp(backend.getHost());
@@ -256,9 +250,9 @@ public class HeartbeatMgr extends MasterDaemon {
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
} finally {
if (ok) {
- ClientPool.heartbeatPool.returnObject(beAddr, client);
+ ClientPool.backendHeartbeatPool.returnObject(beAddr, client);
} else {
- ClientPool.heartbeatPool.invalidateObject(beAddr, client);
+ ClientPool.backendHeartbeatPool.invalidateObject(beAddr, client);
}
}
}
@@ -282,13 +276,20 @@ public class HeartbeatMgr extends MasterDaemon {
// heartbeat to self
if (Catalog.getCurrentCatalog().isReady()) {
return new FrontendHbResponse(fe.getNodeName(), Config.query_port, Config.rpc_port,
- Catalog.getCurrentCatalog().getReplayedJournalId(), System.currentTimeMillis(),
+ Catalog.getCurrentCatalog().getMaxJournalId(), System.currentTimeMillis(),
Version.DORIS_BUILD_VERSION + "-" + Version.DORIS_BUILD_SHORT_HASH);
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}
}
+ if (Config.enable_fe_heartbeat_by_thrift) {
+ return getHeartbeatResponseByThrift();
+ } else {
+ return getHeartbeatResponseByHttp();
+ }
+ }
+ private HeartbeatResponse getHeartbeatResponseByHttp() {
String url = "http://" + fe.getHost() + ":" + Config.http_port
+ "/api/bootstrap?cluster_id=" + clusterId + "&token=" + token;
try {
@@ -338,6 +339,35 @@ public class HeartbeatMgr extends MasterDaemon {
Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
}
}
+
+ private HeartbeatResponse getHeartbeatResponseByThrift() {
+ FrontendService.Client client = null;
+ TNetworkAddress addr = new TNetworkAddress(fe.getHost(), fe.getRpcPort());
+ boolean ok = false;
+ try {
+ client = ClientPool.frontendHeartbeatPool.borrowObject(addr);
+ TFrontendPingFrontendRequest request = new TFrontendPingFrontendRequest(clusterId, token);
+ TFrontendPingFrontendResult result = client.ping(request);
+ ok = true;
+ if (result.getStatus() == TFrontendPingFrontendStatusCode.OK) {
+ return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(),
+ result.getRpcPort(), result.getReplayedJournalId(),
+ System.currentTimeMillis(), result.getVersion());
+
+ } else {
+ return new FrontendHbResponse(fe.getNodeName(), result.getMsg());
+ }
+ } catch (Exception e) {
+ return new FrontendHbResponse(fe.getNodeName(),
+ Strings.isNullOrEmpty(e.getMessage()) ? "got exception" : e.getMessage());
+ } finally {
+ if (ok) {
+ ClientPool.frontendHeartbeatPool.returnObject(addr, client);
+ } else {
+ ClientPool.frontendHeartbeatPool.invalidateObject(addr, client);
+ }
+ }
+ }
}
// broker heartbeat handler
diff --git a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
index bb2aaa0..d1089d0 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/system/HeartbeatMgrTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.system;
import mockit.Expectations;
import org.apache.doris.catalog.Catalog;
import org.apache.doris.catalog.FsBroker;
+import org.apache.doris.common.Config;
import org.apache.doris.common.GenericPool;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Util;
@@ -27,12 +28,17 @@ import org.apache.doris.ha.FrontendNodeType;
import org.apache.doris.system.HeartbeatMgr.BrokerHeartbeatHandler;
import org.apache.doris.system.HeartbeatMgr.FrontendHeartbeatHandler;
import org.apache.doris.system.HeartbeatResponse.HbStatus;
+import org.apache.doris.thrift.FrontendService;
import org.apache.doris.thrift.TBrokerOperationStatus;
import org.apache.doris.thrift.TBrokerOperationStatusCode;
import org.apache.doris.thrift.TBrokerPingBrokerRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendRequest;
+import org.apache.doris.thrift.TFrontendPingFrontendResult;
+import org.apache.doris.thrift.TFrontendPingFrontendStatusCode;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TPaloBrokerService;
+import org.apache.thrift.TException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -67,7 +73,7 @@ public class HeartbeatMgrTest {
}
@Test
- public void testFrontendHbHandler() {
+ public void testFrontendHbHandlerWithHttp() {
new MockUp<Util>() {
@Mock
public String getResultForUrl(String urlStr, String encodedAuthInfo,
@@ -90,11 +96,11 @@ public class HeartbeatMgrTest {
}
}
};
-
+ Config.enable_fe_heartbeat_by_thrift = false;
+ System.out.println(" config " + Config.enable_fe_heartbeat_by_thrift);
Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
HeartbeatResponse response = handler.call();
-
Assert.assertTrue(response instanceof FrontendHbResponse);
FrontendHbResponse hbResponse = (FrontendHbResponse) response;
Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
@@ -113,7 +119,78 @@ public class HeartbeatMgrTest {
Assert.assertEquals(0, hbResponse.getQueryPort());
Assert.assertEquals(0, hbResponse.getRpcPort());
Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
+ }
+ @Test
+ public void testFrontendHbHandlerWithThirft(@Mocked FrontendService.Client client) throws TException {
+ new MockUp<GenericPool<FrontendService.Client>>() {
+ @Mock
+ public FrontendService.Client borrowObject(TNetworkAddress address) throws Exception {
+ return client;
+ }
+
+ @Mock
+ public void returnObject(TNetworkAddress address, FrontendService.Client object) {
+ return;
+ }
+
+ @Mock
+ public void invalidateObject(TNetworkAddress address, FrontendService.Client object) {
+ return;
+ }
+ };
+
+ TFrontendPingFrontendRequest normalRequest = new TFrontendPingFrontendRequest(12345, "abcd");
+ TFrontendPingFrontendResult normalResult = new TFrontendPingFrontendResult();
+ normalResult.setStatus(TFrontendPingFrontendStatusCode.OK);
+ normalResult.setMsg("success");
+ normalResult.setReplayedJournalId(191224);
+ normalResult.setQueryPort(9131);
+ normalResult.setRpcPort(9121);
+ normalResult.setVersion("test");
+
+ TFrontendPingFrontendRequest badRequest = new TFrontendPingFrontendRequest(12345, "abcde");
+ TFrontendPingFrontendResult badResult = new TFrontendPingFrontendResult();
+ badResult.setStatus(TFrontendPingFrontendStatusCode.FAILED);
+ badResult.setMsg("not ready");
+
+ new Expectations() {
+ {
+ client.ping(normalRequest);
+ minTimes = 0;
+ result = normalResult;
+
+ client.ping(badRequest);
+ minTimes = 0;
+ result = badResult;
+ }
+ };
+
+ Config.enable_fe_heartbeat_by_thrift = true;
+
+ Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "test", "192.168.1.1", 9010);
+ FrontendHeartbeatHandler handler = new FrontendHeartbeatHandler(fe, 12345, "abcd");
+ HeartbeatResponse response = handler.call();
+
+ Assert.assertTrue(response instanceof FrontendHbResponse);
+ FrontendHbResponse hbResponse = (FrontendHbResponse) response;
+ Assert.assertEquals(191224, hbResponse.getReplayedJournalId());
+ Assert.assertEquals(9131, hbResponse.getQueryPort());
+ Assert.assertEquals(9121, hbResponse.getRpcPort());
+ Assert.assertEquals(HbStatus.OK, hbResponse.getStatus());
+ Assert.assertEquals("test", hbResponse.getVersion());
+
+ Frontend fe2 = new Frontend(FrontendNodeType.FOLLOWER, "test2", "192.168.1.2", 9010);
+ handler = new FrontendHeartbeatHandler(fe2, 12345, "abcde");
+ response = handler.call();
+
+ Assert.assertTrue(response instanceof FrontendHbResponse);
+ hbResponse = (FrontendHbResponse) response;
+ Assert.assertEquals(0, hbResponse.getReplayedJournalId());
+ Assert.assertEquals(0, hbResponse.getQueryPort());
+ Assert.assertEquals(0, hbResponse.getRpcPort());
+ Assert.assertEquals(HbStatus.BAD, hbResponse.getStatus());
+ Assert.assertEquals("not ready", hbResponse.getMsg());
}
@Test
diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift
index 445d027..35182f5 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -657,6 +657,25 @@ struct TSnapshotLoaderReportRequest {
5: optional i32 total_num
}
+enum TFrontendPingFrontendStatusCode {
+ OK = 0,
+ FAILED = 1
+}
+
+struct TFrontendPingFrontendRequest {
+ 1: required i32 clusterId
+ 2: required string token
+}
+
+struct TFrontendPingFrontendResult {
+ 1: required TFrontendPingFrontendStatusCode status
+ 2: required string msg
+ 3: required i32 queryPort
+ 4: required i32 rpcPort
+ 5: required i64 replayedJournalId
+ 6: required string version
+}
+
service FrontendService {
TGetDbsResult getDbNames(1:TGetDbsParams params)
TGetTablesResult getTableNames(1:TGetTablesParams params)
@@ -692,4 +711,6 @@ service FrontendService {
TStreamLoadPutResult streamLoadPut(1: TStreamLoadPutRequest request)
Status.TStatus snapshotLoaderReport(1: TSnapshotLoaderReportRequest request)
-}
+
+ TFrontendPingFrontendResult ping(1: TFrontendPingFrontendRequest request)
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org