You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:00:15 UTC
[07/29] incubator-distributedlog git commit: dl: add flag to enable
thrift mux on DL Client
dl: add flag to enable thrift mux on DL Client
RB_ID=839555
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d3a97bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d3a97bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d3a97bc0
Branch: refs/heads/merge/DL-98
Commit: d3a97bc0dde0c25516840725599ac46fa03601ab
Parents: 98dc9ab
Author: Dave Rusek <dr...@twitter.com>
Authored: Mon Jun 6 16:50:25 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:39:04 2016 -0800
----------------------------------------------------------------------
.../distributedlog/service/MonitorService.java | 28 ++++++++++++++------
.../service/MonitorServiceApp.java | 1 +
2 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 2683b47..6b58eff 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -85,6 +85,7 @@ public class MonitorService implements NamespaceListener {
private int heartbeatEveryChecks = 0;
private int instanceId = -1;
private int totalInstances = -1;
+ private boolean isThriftMux = false;
// Options
private final Optional<String> uriArg;
@@ -98,6 +99,7 @@ public class MonitorService implements NamespaceListener {
private final Optional<Integer> heartbeatEveryChecksArg;
private final Optional<Boolean> handshakeWithClientInfoArg;
private final Optional<Boolean> watchNamespaceChangesArg;
+ private final Optional<Boolean> isThriftMuxArg;
// Stats
private final StatsProvider statsProvider;
@@ -224,6 +226,7 @@ public class MonitorService implements NamespaceListener {
Optional<Integer> heartbeatEveryChecksArg,
Optional<Boolean> handshakeWithClientInfoArg,
Optional<Boolean> watchNamespaceChangesArg,
+ Optional<Boolean> isThriftMuxArg,
StatsReceiver statsReceiver,
StatsProvider statsProvider) {
// options
@@ -238,6 +241,7 @@ public class MonitorService implements NamespaceListener {
this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+ this.isThriftMuxArg = isThriftMuxArg;
// Stats
this.statsReceiver = statsReceiver;
@@ -275,6 +279,7 @@ public class MonitorService implements NamespaceListener {
}
handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+ isThriftMux = isThriftMuxArg.isPresent();
URI uri = URI.create(uriArg.get());
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
if (confFileArg.isPresent()) {
@@ -300,8 +305,22 @@ public class MonitorService implements NamespaceListener {
ServerSet[] remotes = new ServerSet[serverSets.length - 1];
System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
+ ClientBuilder finagleClientBuilder = ClientBuilder.get()
+ .connectTimeout(Duration.fromSeconds(1))
+ .tcpConnectTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(2))
+ .keepAlive(true)
+ .failFast(false);
+
+ if (!isThriftMux) {
+ finagleClientBuilder = finagleClientBuilder
+ .hostConnectionLimit(2)
+ .hostConnectionCoresize(2);
+ }
+
dlClient = DistributedLogClientBuilder.newBuilder()
.name("monitor")
+ .thriftmux(isThriftMux)
.clientId(ClientId$.MODULE$.apply("monitor"))
.redirectBackoffMaxMs(50)
.redirectBackoffStartMs(100)
@@ -310,14 +329,7 @@ public class MonitorService implements NamespaceListener {
.serverSets(local, remotes)
.streamNameRegex(streamRegex)
.handshakeWithClientInfo(handshakeWithClientInfo)
- .clientBuilder(ClientBuilder.get()
- .connectTimeout(Duration.fromSeconds(1))
- .tcpConnectTimeout(Duration.fromSeconds(1))
- .requestTimeout(Duration.fromSeconds(2))
- .hostConnectionLimit(2)
- .hostConnectionCoresize(2)
- .keepAlive(true)
- .failFast(false))
+ .clientBuilder(finagleClientBuilder)
.statsReceiver(monitorReceiver.scope("client"))
.buildMonitorClient();
runMonitor(dlConf, uri);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index 90d3566..a51a6a9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -99,6 +99,7 @@ public class MonitorServiceApp {
getOptionalIntegerArg(cmdline, "hck"),
getOptionalBooleanArg(cmdline, "hsci"),
getOptionalBooleanArg(cmdline, "w"),
+ getOptionalBooleanArg(cmdline, "mx"),
statsReceiver,
statsProvider);