You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:00:15 UTC

[07/29] incubator-distributedlog git commit: dl: add flag to enable thrift mux on DL Client

dl: add flag to enable thrift mux on DL Client

RB_ID=839555


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d3a97bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d3a97bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d3a97bc0

Branch: refs/heads/merge/DL-98
Commit: d3a97bc0dde0c25516840725599ac46fa03601ab
Parents: 98dc9ab
Author: Dave Rusek <dr...@twitter.com>
Authored: Mon Jun 6 16:50:25 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:39:04 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/service/MonitorService.java  | 28 ++++++++++++++------
 .../service/MonitorServiceApp.java              |  1 +
 2 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 2683b47..6b58eff 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -85,6 +85,7 @@ public class MonitorService implements NamespaceListener {
     private int heartbeatEveryChecks = 0;
     private int instanceId = -1;
     private int totalInstances = -1;
+    private boolean isThriftMux = false;
 
     // Options
     private final Optional<String> uriArg;
@@ -98,6 +99,7 @@ public class MonitorService implements NamespaceListener {
     private final Optional<Integer> heartbeatEveryChecksArg;
     private final Optional<Boolean> handshakeWithClientInfoArg;
     private final Optional<Boolean> watchNamespaceChangesArg;
+    private final Optional<Boolean> isThriftMuxArg;
 
     // Stats
     private final StatsProvider statsProvider;
@@ -224,6 +226,7 @@ public class MonitorService implements NamespaceListener {
                    Optional<Integer> heartbeatEveryChecksArg,
                    Optional<Boolean> handshakeWithClientInfoArg,
                    Optional<Boolean> watchNamespaceChangesArg,
+                   Optional<Boolean> isThriftMuxArg,
                    StatsReceiver statsReceiver,
                    StatsProvider statsProvider) {
         // options
@@ -238,6 +241,7 @@ public class MonitorService implements NamespaceListener {
         this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
         this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
         this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+        this.isThriftMuxArg = isThriftMuxArg;
 
         // Stats
         this.statsReceiver = statsReceiver;
@@ -275,6 +279,7 @@ public class MonitorService implements NamespaceListener {
         }
         handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
         watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+        isThriftMux = isThriftMuxArg.isPresent();
         URI uri = URI.create(uriArg.get());
         DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
         if (confFileArg.isPresent()) {
@@ -300,8 +305,22 @@ public class MonitorService implements NamespaceListener {
         ServerSet[] remotes  = new ServerSet[serverSets.length - 1];
         System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
 
+        ClientBuilder finagleClientBuilder = ClientBuilder.get()
+            .connectTimeout(Duration.fromSeconds(1))
+            .tcpConnectTimeout(Duration.fromSeconds(1))
+            .requestTimeout(Duration.fromSeconds(2))
+            .keepAlive(true)
+            .failFast(false);
+
+        if (!isThriftMux) {
+            finagleClientBuilder = finagleClientBuilder
+                .hostConnectionLimit(2)
+                .hostConnectionCoresize(2);
+        }
+
         dlClient = DistributedLogClientBuilder.newBuilder()
                 .name("monitor")
+                .thriftmux(isThriftMux)
                 .clientId(ClientId$.MODULE$.apply("monitor"))
                 .redirectBackoffMaxMs(50)
                 .redirectBackoffStartMs(100)
@@ -310,14 +329,7 @@ public class MonitorService implements NamespaceListener {
                 .serverSets(local, remotes)
                 .streamNameRegex(streamRegex)
                 .handshakeWithClientInfo(handshakeWithClientInfo)
-                .clientBuilder(ClientBuilder.get()
-                        .connectTimeout(Duration.fromSeconds(1))
-                        .tcpConnectTimeout(Duration.fromSeconds(1))
-                        .requestTimeout(Duration.fromSeconds(2))
-                        .hostConnectionLimit(2)
-                        .hostConnectionCoresize(2)
-                        .keepAlive(true)
-                        .failFast(false))
+                .clientBuilder(finagleClientBuilder)
                 .statsReceiver(monitorReceiver.scope("client"))
                 .buildMonitorClient();
         runMonitor(dlConf, uri);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index 90d3566..a51a6a9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -99,6 +99,7 @@ public class MonitorServiceApp {
                 getOptionalIntegerArg(cmdline, "hck"),
                 getOptionalBooleanArg(cmdline, "hsci"),
                 getOptionalBooleanArg(cmdline, "w"),
+                getOptionalBooleanArg(cmdline, "mx"),
                 statsReceiver,
                 statsProvider);