You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@distributedlog.apache.org by si...@apache.org on 2016/09/07 08:40:20 UTC

incubator-distributedlog git commit: DL-39: Use a distributedlog uri to configure write proxy routing address

Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master b9d21568c -> 05a8daa2a


DL-39: Use a distributedlog uri to configure write proxy routing address

Author: Jon Derrick <jo...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #18 from jderrickk/jd/uri_resolver


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/05a8daa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/05a8daa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/05a8daa2

Branch: refs/heads/master
Commit: 05a8daa2aa468fe82f6ca919f7d93f0bab86c522
Parents: b9d2156
Author: Jon Derrick <jo...@gmail.com>
Authored: Wed Sep 7 01:40:17 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Wed Sep 7 01:40:17 2016 -0700

----------------------------------------------------------------------
 distributedlog-benchmark/bin/dbench             |  2 --
 distributedlog-benchmark/conf/dlogenv.sh        |  3 ---
 .../distributedlog/benchmark/Benchmarker.java   | 11 +++++---
 .../distributedlog/benchmark/ReaderWorker.java  |  5 +++-
 .../distributedlog/benchmark/WriterWorker.java  |  9 +++++--
 .../service/DistributedLogClientBuilder.java    | 28 ++++++++++++++++++++
 docs/operations/deployment.rst                  |  4 +--
 7 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/bin/dbench
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/bin/dbench b/distributedlog-benchmark/bin/dbench
index 667c6a5..b84e133 100755
--- a/distributedlog-benchmark/bin/dbench
+++ b/distributedlog-benchmark/bin/dbench
@@ -182,7 +182,6 @@ elif [ $COMMAND == "write" ]; then
     --max-rate ${MAX_RATE} \\
     --change-rate ${CHANGE_RATE} \\
     --change-interval ${CHANGE_RATE_INTERVAL} \\
-    --finagle-name ${DL_WP_FINAGLE_NAME}
     """
     BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode write \\"
     exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
@@ -191,7 +190,6 @@ elif [ $COMMAND == "read" ]; then
     --readers-per-stream ${NUM_READERS_PER_STREAM} \\
     --max-stream-id ${MAX_STREAM_ID} \\
     --truncation-interval ${TRUNCATION_INTERVAL} \\
-    --finagle-name ${DL_WP_FINAGLE_NAME}
     """
     BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_READ_ARGS} \\ --mode read \\"
     exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/conf/dlogenv.sh
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/conf/dlogenv.sh b/distributedlog-benchmark/conf/dlogenv.sh
index 608294e..205606b 100644
--- a/distributedlog-benchmark/conf/dlogenv.sh
+++ b/distributedlog-benchmark/conf/dlogenv.sh
@@ -76,9 +76,6 @@ CHANGE_RATE=100
 # Rate change interval, in seconds
 CHANGE_RATE_INTERVAL=300
 
-# DL Write Proxy Finagle Name
-DL_WP_FINAGLE_NAME='zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy'
-
 ##########
 # Reader 
 ##########

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 25b70a4..7114dbb 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -265,8 +265,8 @@ public class Benchmarker {
     }
 
     Worker runWriter() {
-        Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
-                "either serverset paths or finagle-names required");
+        Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+                "either serverset paths, finagle-names or uri required");
         Preconditions.checkArgument(msgSize > 0, "messagesize must be greater than 0");
         Preconditions.checkArgument(rate > 0, "rate must be greater than 0");
         Preconditions.checkArgument(maxRate >= rate, "max rate must be greater than rate");
@@ -278,6 +278,7 @@ public class Benchmarker {
                 new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
         return createWriteWorker(
                 streamPrefix,
+                dlUri,
                 null == startStreamId ? shardId * numStreams : startStreamId,
                 null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
                 rateLimiter,
@@ -299,6 +300,7 @@ public class Benchmarker {
 
     protected WriterWorker createWriteWorker(
             String streamPrefix,
+            URI uri,
             int startStreamId,
             int endStreamId,
             ShiftableRateLimiter rateLimiter,
@@ -318,6 +320,7 @@ public class Benchmarker {
             boolean enableBatching) {
         return new WriterWorker(
                 streamPrefix,
+                uri,
                 startStreamId,
                 endStreamId,
                 rateLimiter,
@@ -360,8 +363,8 @@ public class Benchmarker {
     }
 
     Worker runReader() throws IOException {
-        Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
-                "either serverset paths or finagle-names required");
+        Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+                "either serverset paths, finagle-names or dlUri required");
         Preconditions.checkArgument(concurrency > 0, "concurrency must be greater than 0");
         Preconditions.checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
         return runReaderInternal(serversetPaths, finagleNames, truncationInterval);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
index 4bc55d2..5b34939 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
@@ -254,7 +254,7 @@ public class ReaderWorker implements Worker {
 
                 builder = builder.finagleNameStrs(local, remotes);
                 LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
-            } else {
+            } else if (serverSets.length != 0){
                 ServerSet local = this.serverSets[0].getServerSet();
                 ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
                 for (int i = 1; i < serverSets.length; i++) {
@@ -263,6 +263,9 @@ public class ReaderWorker implements Worker {
 
                 builder = builder.serverSets(local, remotes);
                 LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
+            } else {
+                builder = builder.uri(uri);
+                LOG.info("Initialized distributedlog client for namespace {}", uri);
             }
             dlc = builder.build();
         } else {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 5a961df..92fc090 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -65,6 +65,7 @@ public class WriterWorker implements Worker {
     final int hostConnectionLimit;
     final ExecutorService executorService;
     final ShiftableRateLimiter rateLimiter;
+    final URI dlUri;
     final DLZkServerSet[] serverSets;
     final List<String> finagleNames;
     final Random random;
@@ -86,6 +87,7 @@ public class WriterWorker implements Worker {
     final StatsLogger dlErrorCodeLogger;
 
     public WriterWorker(String streamPrefix,
+                        URI uri,
                         int startStreamId,
                         int endStreamId,
                         ShiftableRateLimiter rateLimiter,
@@ -106,6 +108,7 @@ public class WriterWorker implements Worker {
         Preconditions.checkArgument(startStreamId <= endStreamId);
         Preconditions.checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
         this.streamPrefix = streamPrefix;
+        this.dlUri = uri;
         this.startStreamId = startStreamId;
         this.endStreamId = endStreamId;
         this.rateLimiter = rateLimiter;
@@ -184,19 +187,21 @@ public class WriterWorker implements Worker {
             .handshakeTracing(true)
             .name("writer");
 
-        if (serverSets.length == 0) {
+        if (!finagleNames.isEmpty()) {
             String local = finagleNames.get(0);
             String[] remotes = new String[finagleNames.size() - 1];
             finagleNames.subList(1, finagleNames.size()).toArray(remotes);
 
             builder = builder.finagleNameStrs(local, remotes);
-        } else {
+        } else if (serverSets.length != 0){
             ServerSet local = serverSets[0].getServerSet();
             ServerSet[] remotes = new ServerSet[serverSets.length - 1];
             for (int i = 1; i < serverSets.length; i++) {
                 remotes[i-1] = serverSets[i].getServerSet();
             }
             builder = builder.serverSets(local, remotes);
+        } else {
+            builder = builder.uri(dlUri);
         }
 
         return builder.build();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index c4a82e5..5c6a54b 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -32,11 +32,16 @@ import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId;
+import org.apache.commons.lang.StringUtils;
 
 import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Random;
 
 public final class DistributedLogClientBuilder {
 
+    private static final Random random = new Random(System.currentTimeMillis());
+
     private String _name = null;
     private ClientId _clientId = null;
     private RoutingService.Builder _routingServiceBuilder = null;
@@ -171,6 +176,29 @@ public final class DistributedLogClientBuilder {
     }
 
     /**
+     * URI to access proxy services. Assuming the write proxies are announced under `.write_proxy` of
+     * the provided namespace uri.
+     * <p>
+     * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
+     * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
+     *
+     * @param uri namespace uri to access the serverset of write proxies
+     * @return distributedlog builder
+     */
+    public DistributedLogClientBuilder uri(URI uri) {
+        DistributedLogClientBuilder newBuilder = newBuilder(this);
+        String zkServers = uri.getAuthority().replace(";", ",");
+        String[] zkServerList = StringUtils.split(zkServers, ',');
+        String finagleNameStr = String.format(
+                "zk!%s!%s/.write_proxy",
+                zkServerList[random.nextInt(zkServerList.length)], // zk server
+                uri.getPath());
+        newBuilder._routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
+        newBuilder._enableRegionStats = false;
+        return newBuilder;
+    }
+
+    /**
      * Address of write proxy to connect.
      *
      * @param address

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/docs/operations/deployment.rst
----------------------------------------------------------------------
diff --git a/docs/operations/deployment.rst b/docs/operations/deployment.rst
index 08191f4..41240db 100644
--- a/docs/operations/deployment.rst
+++ b/docs/operations/deployment.rst
@@ -464,9 +464,7 @@ Write Proxy Naming
 ++++++++++++++++++
 
 The `dlog-daemon.sh` script starts the write proxy by announcing it to the `.write_proxy` path under
-the dl namespace. So you could use `zk!<zkservers>!/<namespace_path>/.write_proxy` as the finagle name
-to access the write proxy cluster. It is `zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy`
-in the above example.
+the dl namespace. So you could use uri in the distributedlog client builder to access the write proxy cluster.
 
 Verify the setup
 ++++++++++++++++