You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@distributedlog.apache.org by si...@apache.org on 2016/09/07 08:40:20 UTC
incubator-distributedlog git commit: DL-39: Use a distributedlog uri
to configure write proxy routing address
Repository: incubator-distributedlog
Updated Branches:
refs/heads/master b9d21568c -> 05a8daa2a
DL-39: Use a distributedlog uri to configure write proxy routing address
Author: Jon Derrick <jo...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #18 from jderrickk/jd/uri_resolver
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/05a8daa2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/05a8daa2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/05a8daa2
Branch: refs/heads/master
Commit: 05a8daa2aa468fe82f6ca919f7d93f0bab86c522
Parents: b9d2156
Author: Jon Derrick <jo...@gmail.com>
Authored: Wed Sep 7 01:40:17 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Wed Sep 7 01:40:17 2016 -0700
----------------------------------------------------------------------
distributedlog-benchmark/bin/dbench | 2 --
distributedlog-benchmark/conf/dlogenv.sh | 3 ---
.../distributedlog/benchmark/Benchmarker.java | 11 +++++---
.../distributedlog/benchmark/ReaderWorker.java | 5 +++-
.../distributedlog/benchmark/WriterWorker.java | 9 +++++--
.../service/DistributedLogClientBuilder.java | 28 ++++++++++++++++++++
docs/operations/deployment.rst | 4 +--
7 files changed, 47 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/bin/dbench
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/bin/dbench b/distributedlog-benchmark/bin/dbench
index 667c6a5..b84e133 100755
--- a/distributedlog-benchmark/bin/dbench
+++ b/distributedlog-benchmark/bin/dbench
@@ -182,7 +182,6 @@ elif [ $COMMAND == "write" ]; then
--max-rate ${MAX_RATE} \\
--change-rate ${CHANGE_RATE} \\
--change-interval ${CHANGE_RATE_INTERVAL} \\
- --finagle-name ${DL_WP_FINAGLE_NAME}
"""
BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_WRITE_ARGS} \\ --mode write \\"
exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
@@ -191,7 +190,6 @@ elif [ $COMMAND == "read" ]; then
--readers-per-stream ${NUM_READERS_PER_STREAM} \\
--max-stream-id ${MAX_STREAM_ID} \\
--truncation-interval ${TRUNCATION_INTERVAL} \\
- --finagle-name ${DL_WP_FINAGLE_NAME}
"""
BENCH_ARGS="${BENCH_ARGS} \\ ${BENCH_READ_ARGS} \\ --mode read \\"
exec java $OPTS $JMX_ARGS com.twitter.distributedlog.benchmark.Benchmarker $BENCH_ARGS $@
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/conf/dlogenv.sh
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/conf/dlogenv.sh b/distributedlog-benchmark/conf/dlogenv.sh
index 608294e..205606b 100644
--- a/distributedlog-benchmark/conf/dlogenv.sh
+++ b/distributedlog-benchmark/conf/dlogenv.sh
@@ -76,9 +76,6 @@ CHANGE_RATE=100
# Rate change interval, in seconds
CHANGE_RATE_INTERVAL=300
-# DL Write Proxy Finagle Name
-DL_WP_FINAGLE_NAME='zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy'
-
##########
# Reader
##########
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
index 25b70a4..7114dbb 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/Benchmarker.java
@@ -265,8 +265,8 @@ public class Benchmarker {
}
Worker runWriter() {
- Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
- "either serverset paths or finagle-names required");
+ Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+ "either serverset paths, finagle-names or uri required");
Preconditions.checkArgument(msgSize > 0, "messagesize must be greater than 0");
Preconditions.checkArgument(rate > 0, "rate must be greater than 0");
Preconditions.checkArgument(maxRate >= rate, "max rate must be greater than rate");
@@ -278,6 +278,7 @@ public class Benchmarker {
new ShiftableRateLimiter(rate, maxRate, changeRate, changeRateSeconds, TimeUnit.SECONDS);
return createWriteWorker(
streamPrefix,
+ dlUri,
null == startStreamId ? shardId * numStreams : startStreamId,
null == endStreamId ? (shardId + 1) * numStreams : endStreamId,
rateLimiter,
@@ -299,6 +300,7 @@ public class Benchmarker {
protected WriterWorker createWriteWorker(
String streamPrefix,
+ URI uri,
int startStreamId,
int endStreamId,
ShiftableRateLimiter rateLimiter,
@@ -318,6 +320,7 @@ public class Benchmarker {
boolean enableBatching) {
return new WriterWorker(
streamPrefix,
+ uri,
startStreamId,
endStreamId,
rateLimiter,
@@ -360,8 +363,8 @@ public class Benchmarker {
}
Worker runReader() throws IOException {
- Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty(),
- "either serverset paths or finagle-names required");
+ Preconditions.checkArgument(!finagleNames.isEmpty() || !serversetPaths.isEmpty() || null != dlUri,
+ "either serverset paths, finagle-names or dlUri required");
Preconditions.checkArgument(concurrency > 0, "concurrency must be greater than 0");
Preconditions.checkArgument(truncationInterval > 0, "truncation interval should be greater than 0");
return runReaderInternal(serversetPaths, finagleNames, truncationInterval);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
index 4bc55d2..5b34939 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/ReaderWorker.java
@@ -254,7 +254,7 @@ public class ReaderWorker implements Worker {
builder = builder.finagleNameStrs(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", finagleNames);
- } else {
+ } else if (serverSets.length != 0){
ServerSet local = this.serverSets[0].getServerSet();
ServerSet[] remotes = new ServerSet[this.serverSets.length - 1];
for (int i = 1; i < serverSets.length; i++) {
@@ -263,6 +263,9 @@ public class ReaderWorker implements Worker {
builder = builder.serverSets(local, remotes);
LOG.info("Initialized distributedlog client for truncation @ {}.", serverSetPaths);
+ } else {
+ builder = builder.uri(uri);
+ LOG.info("Initialized distributedlog client for namespace {}", uri);
}
dlc = builder.build();
} else {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
index 5a961df..92fc090 100644
--- a/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
+++ b/distributedlog-benchmark/src/main/java/com/twitter/distributedlog/benchmark/WriterWorker.java
@@ -65,6 +65,7 @@ public class WriterWorker implements Worker {
final int hostConnectionLimit;
final ExecutorService executorService;
final ShiftableRateLimiter rateLimiter;
+ final URI dlUri;
final DLZkServerSet[] serverSets;
final List<String> finagleNames;
final Random random;
@@ -86,6 +87,7 @@ public class WriterWorker implements Worker {
final StatsLogger dlErrorCodeLogger;
public WriterWorker(String streamPrefix,
+ URI uri,
int startStreamId,
int endStreamId,
ShiftableRateLimiter rateLimiter,
@@ -106,6 +108,7 @@ public class WriterWorker implements Worker {
Preconditions.checkArgument(startStreamId <= endStreamId);
Preconditions.checkArgument(!finagleNames.isEmpty() || !serverSetPaths.isEmpty());
this.streamPrefix = streamPrefix;
+ this.dlUri = uri;
this.startStreamId = startStreamId;
this.endStreamId = endStreamId;
this.rateLimiter = rateLimiter;
@@ -184,19 +187,21 @@ public class WriterWorker implements Worker {
.handshakeTracing(true)
.name("writer");
- if (serverSets.length == 0) {
+ if (!finagleNames.isEmpty()) {
String local = finagleNames.get(0);
String[] remotes = new String[finagleNames.size() - 1];
finagleNames.subList(1, finagleNames.size()).toArray(remotes);
builder = builder.finagleNameStrs(local, remotes);
- } else {
+ } else if (serverSets.length != 0){
ServerSet local = serverSets[0].getServerSet();
ServerSet[] remotes = new ServerSet[serverSets.length - 1];
for (int i = 1; i < serverSets.length; i++) {
remotes[i-1] = serverSets[i].getServerSet();
}
builder = builder.serverSets(local, remotes);
+ } else {
+ builder = builder.uri(dlUri);
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
index c4a82e5..5c6a54b 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/service/DistributedLogClientBuilder.java
@@ -32,11 +32,16 @@ import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId;
+import org.apache.commons.lang.StringUtils;
import java.net.SocketAddress;
+import java.net.URI;
+import java.util.Random;
public final class DistributedLogClientBuilder {
+ private static final Random random = new Random(System.currentTimeMillis());
+
private String _name = null;
private ClientId _clientId = null;
private RoutingService.Builder _routingServiceBuilder = null;
@@ -171,6 +176,29 @@ public final class DistributedLogClientBuilder {
}
/**
+ * URI to access proxy services. Assuming the write proxies are announced under `.write_proxy` of
+ * the provided namespace uri.
+ * <p>
+ * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
+ * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
+ *
+ * @param uri namespace uri to access the serverset of write proxies
+ * @return distributedlog builder
+ */
+ public DistributedLogClientBuilder uri(URI uri) {
+ DistributedLogClientBuilder newBuilder = newBuilder(this);
+ String zkServers = uri.getAuthority().replace(";", ",");
+ String[] zkServerList = StringUtils.split(zkServers, ',');
+ String finagleNameStr = String.format(
+ "zk!%s!%s/.write_proxy",
+ zkServerList[random.nextInt(zkServerList.length)], // zk server
+ uri.getPath());
+ newBuilder._routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
+ newBuilder._enableRegionStats = false;
+ return newBuilder;
+ }
+
+ /**
* Address of write proxy to connect.
*
* @param address
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/05a8daa2/docs/operations/deployment.rst
----------------------------------------------------------------------
diff --git a/docs/operations/deployment.rst b/docs/operations/deployment.rst
index 08191f4..41240db 100644
--- a/docs/operations/deployment.rst
+++ b/docs/operations/deployment.rst
@@ -464,9 +464,7 @@ Write Proxy Naming
++++++++++++++++++
The `dlog-daemon.sh` script starts the write proxy by announcing it to the `.write_proxy` path under
-the dl namespace. So you could use `zk!<zkservers>!/<namespace_path>/.write_proxy` as the finagle name
-to access the write proxy cluster. It is `zk!127.0.0.1:2181!/messaging/distributedlog/mynamespace/.write_proxy`
-in the above example.
+the dl namespace. So you could use uri in the distributedlog client builder to access the write proxy cluster.
Verify the setup
++++++++++++++++