You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2016/12/21 08:00:09 UTC

[01/29] incubator-distributedlog git commit: when publishing per partition stats, also publish per stream stats

Repository: incubator-distributedlog
Updated Branches:
  refs/heads/merge/DL-98 [created] 28a8b8ff9


when publishing per partition stats, also publish per stream stats

RB_ID=820062


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/904b8986
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/904b8986
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/904b8986

Branch: refs/heads/merge/DL-98
Commit: 904b8986b46fc908cfd1a0ee05b35dce77d12f5c
Parents: 3fceccc
Author: Jordan Bull <jb...@twitter.com>
Authored: Fri May 13 11:27:19 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:16:37 2016 -0800

----------------------------------------------------------------------
 .../service/DistributedLogServiceImpl.java      | 11 +++++++----
 .../service/stream/BulkWriteOp.java             |  8 ++++++--
 .../service/stream/StreamImpl.java              |  4 ++--
 .../service/stream/StreamOpStats.java           | 20 ++++++++++++--------
 .../distributedlog/service/stream/WriteOp.java  |  8 ++++++--
 .../service/TestDistributedLogServer.java       |  8 ++++----
 .../service/TestDistributedLogService.java      |  1 +
 .../service/stream/TestStreamOp.java            |  3 ++-
 8 files changed, 40 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index de475ea..751e972 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -132,6 +132,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     // Stats
     private final StatsLogger statsLogger;
     private final StatsLogger perStreamStatsLogger;
+    private final StreamPartitionConverter streamPartitionConverter;
     private final StreamOpStats streamOpStats;
     private final Counter bulkWritePendingStat;
     private final Counter writePendingStat;
@@ -158,6 +159,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         this.perStreamStatsLogger = perStreamStatsLogger;
         this.dlsnVersion = serverConf.getDlsnVersion();
         this.serverRegionId = serverConf.getRegionId();
+        this.streamPartitionConverter = converter;
         int serverPort = serverConf.getServerPort();
         int shard = serverConf.getServerShardId();
         int numThreads = serverConf.getServerThreads();
@@ -396,8 +398,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
         bulkWritePendingStat.inc();
         receivedRecordCounter.add(data.size());
-        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
+        BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            getChecksum(ctx), featureChecksumDisabled, accessControlManager);
         executeStreamOp(op);
         return op.result().ensure(new Function0<BoxedUnit>() {
             public BoxedUnit apply() {
@@ -675,8 +677,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                        ByteBuffer data,
                        Long checksum,
                        boolean isRecordSet) {
-        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, serverConfig, dlsnVersion,
-            checksum, isRecordSet, featureChecksumDisabled, accessControlManager);
+        return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
+            serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
+            accessControlManager);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index 96a37cd..c009bb9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -33,6 +33,8 @@ import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -88,6 +90,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
                        List<ByteBuffer> buffers,
                        StatsLogger statsLogger,
                        StatsLogger perStreamStatsLogger,
+                       StreamPartitionConverter streamPartitionConverter,
                        Long checksum,
                        Feature checksumDisabledFeature,
                        AccessControlManager accessControlManager) {
@@ -100,6 +103,7 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
         }
         this.payloadSize = total;
 
+        final Partition partition = streamPartitionConverter.convert(stream);
         // Write record stats
         StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
         this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
@@ -107,8 +111,8 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
         this.failureRecordCounter = streamOpStats.recordsCounter("failure");
         this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
         this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "bulkWrite");
-        this.bytes = streamOpStats.streamRequestCounter(stream, "bulkWrite", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");
 
         this.accessControlManager = accessControlManager;
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 04f793f..45630fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -194,12 +194,12 @@ public class StreamImpl implements Stream {
         this.dynConf = streamConf;
         StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
             streamOpStats.baseScope("stream_limiter"),
-            streamOpStats.streamRequestScope(name, "limiter"));
+            streamOpStats.streamRequestScope(partition, "limiter"));
         this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
         this.requestTimer = requestTimer;
 
         // Stats
-        this.streamLogger = streamOpStats.streamRequestStatsLogger(name);
+        this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
         this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
         this.streamExceptionStatLogger = streamLogger.scope("exceptions");
         this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index adf89a3..2a44d88 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -17,6 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+import com.twitter.distributedlog.stats.BroadCastStatsLogger;
+import com.twitter.distributedlog.service.streamset.Partition;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
@@ -81,19 +83,21 @@ public class StreamOpStats {
         return recordsStatsLogger.getCounter(counterName);
     }
 
-    public StatsLogger streamRequestStatsLogger(String streamName) {
-        return streamStatsLogger.scope(streamName);
+    public StatsLogger streamRequestStatsLogger(Partition partition) {
+        return BroadCastStatsLogger.masterslave(
+            streamStatsLogger.scope(partition.getStream()).scope("partition").scope(Integer.toString(partition.getId())),
+            streamStatsLogger.scope(partition.getStream()).scope("aggregate"));
     }
 
-    public StatsLogger streamRequestScope(String streamName, String scopeName) {
-        return streamRequestStatsLogger(streamName).scope(scopeName);
+    public StatsLogger streamRequestScope(Partition partition, String scopeName) {
+        return streamRequestStatsLogger(partition).scope(scopeName);
     }
 
-    public OpStatsLogger streamRequestLatencyStat(String streamName, String opName) {
-        return streamRequestStatsLogger(streamName).getOpStatsLogger(opName);
+    public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
+        return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
     }
 
-    public Counter streamRequestCounter(String streamName, String opName, String counterName) {
-        return streamRequestScope(streamName, opName).getCounter(counterName);
+    public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
+        return streamRequestScope(partition, opName).getCounter(counterName);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
index c74f2cd..e9f2f4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
@@ -26,6 +26,8 @@ import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
@@ -67,6 +69,7 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
                    ByteBuffer data,
                    StatsLogger statsLogger,
                    StatsLogger perStreamStatsLogger,
+                   StreamPartitionConverter streamPartitionConverter,
                    ServerConfiguration conf,
                    byte dlsnVersion,
                    Long checksum,
@@ -78,14 +81,15 @@ public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
         data.get(payload);
         this.isRecordSet = isRecordSet;
 
+        final Partition partition = streamPartitionConverter.convert(stream);
         StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
         this.successRecordCounter = streamOpStats.recordsCounter("success");
         this.failureRecordCounter = streamOpStats.recordsCounter("failure");
         this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
         this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
         this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
-        this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "write");
-        this.bytes = streamOpStats.streamRequestCounter(stream, "write", "bytes");
+        this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
+        this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");
 
         this.dlsnVersion = dlsnVersion;
         this.accessControlManager = accessControlManager;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
index 94e8755..63723ef 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServer.java
@@ -106,9 +106,9 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase {
         int numRead = 0;
         LogRecord r = reader.readNext(false);
         while (null != r) {
-            int i = Integer.parseInt(new String(r.getPayload()));
-            assertEquals(numRead + 1, i);
             ++numRead;
+            int i = Integer.parseInt(new String(r.getPayload()));
+            assertEquals(numRead, i);
             r = reader.readNext(false);
         }
         assertEquals(10, numRead);
@@ -121,7 +121,7 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase {
      */
     @Test(timeout = 60000)
     public void testChecksumFlag() throws Exception {
-        String name = "dlserver-basic-write";
+        String name = "testChecksumFlag";
         LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
         routingService.addHost(name, dlServer.getAddress());
         DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
@@ -134,7 +134,7 @@ public class TestDistributedLogServer extends DistributedLogServerTestCase {
                 .connectionTimeout(Duration.fromSeconds(1))
                 .requestTimeout(Duration.fromSeconds(60)))
             .checksum(false);
-        DistributedLogClient dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
+        DistributedLogClient dlClient = dlClientBuilder.build();
         Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
         dlClient.close();
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index ed456b9..17fae4a 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -600,6 +600,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             ByteBuffer.wrap("test".getBytes()),
             new NullStatsLogger(),
             new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
             (byte)0,
             checksum,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/904b8986/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
index 93d900f..41b4c69 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
@@ -24,7 +24,7 @@ import com.twitter.distributedlog.acl.DefaultAccessControlManager;
 import com.twitter.distributedlog.exceptions.InternalServerException;
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
-import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
@@ -67,6 +67,7 @@ public class TestStreamOp {
             ByteBuffer.wrap("test".getBytes()),
             new NullStatsLogger(),
             new NullStatsLogger(),
+            new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
             (byte)0,
             null,


[23/29] incubator-distributedlog git commit: Merge branch 'merge/DL-94' into merge/DL-95

Posted by si...@apache.org.
Merge branch 'merge/DL-94' into merge/DL-95


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/749657f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/749657f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/749657f7

Branch: refs/heads/merge/DL-98
Commit: 749657f7dc35e121161d6f162b250dd5b02741a5
Parents: f4f633f 2879090
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:53:10 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:53:10 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[07/29] incubator-distributedlog git commit: dl: add flag to enable thrift mux on DL Client

Posted by si...@apache.org.
dl: add flag to enable thrift mux on DL Client

RB_ID=839555


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/d3a97bc0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/d3a97bc0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/d3a97bc0

Branch: refs/heads/merge/DL-98
Commit: d3a97bc0dde0c25516840725599ac46fa03601ab
Parents: 98dc9ab
Author: Dave Rusek <dr...@twitter.com>
Authored: Mon Jun 6 16:50:25 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:39:04 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/service/MonitorService.java  | 28 ++++++++++++++------
 .../service/MonitorServiceApp.java              |  1 +
 2 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 2683b47..6b58eff 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -85,6 +85,7 @@ public class MonitorService implements NamespaceListener {
     private int heartbeatEveryChecks = 0;
     private int instanceId = -1;
     private int totalInstances = -1;
+    private boolean isThriftMux = false;
 
     // Options
     private final Optional<String> uriArg;
@@ -98,6 +99,7 @@ public class MonitorService implements NamespaceListener {
     private final Optional<Integer> heartbeatEveryChecksArg;
     private final Optional<Boolean> handshakeWithClientInfoArg;
     private final Optional<Boolean> watchNamespaceChangesArg;
+    private final Optional<Boolean> isThriftMuxArg;
 
     // Stats
     private final StatsProvider statsProvider;
@@ -224,6 +226,7 @@ public class MonitorService implements NamespaceListener {
                    Optional<Integer> heartbeatEveryChecksArg,
                    Optional<Boolean> handshakeWithClientInfoArg,
                    Optional<Boolean> watchNamespaceChangesArg,
+                   Optional<Boolean> isThriftMuxArg,
                    StatsReceiver statsReceiver,
                    StatsProvider statsProvider) {
         // options
@@ -238,6 +241,7 @@ public class MonitorService implements NamespaceListener {
         this.heartbeatEveryChecksArg = heartbeatEveryChecksArg;
         this.handshakeWithClientInfoArg = handshakeWithClientInfoArg;
         this.watchNamespaceChangesArg = watchNamespaceChangesArg;
+        this.isThriftMuxArg = isThriftMuxArg;
 
         // Stats
         this.statsReceiver = statsReceiver;
@@ -275,6 +279,7 @@ public class MonitorService implements NamespaceListener {
         }
         handshakeWithClientInfo = handshakeWithClientInfoArg.isPresent();
         watchNamespaceChanges = watchNamespaceChangesArg.isPresent();
+        isThriftMux = isThriftMuxArg.isPresent();
         URI uri = URI.create(uriArg.get());
         DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
         if (confFileArg.isPresent()) {
@@ -300,8 +305,22 @@ public class MonitorService implements NamespaceListener {
         ServerSet[] remotes  = new ServerSet[serverSets.length - 1];
         System.arraycopy(serverSets, 1, remotes, 0, remotes.length);
 
+        ClientBuilder finagleClientBuilder = ClientBuilder.get()
+            .connectTimeout(Duration.fromSeconds(1))
+            .tcpConnectTimeout(Duration.fromSeconds(1))
+            .requestTimeout(Duration.fromSeconds(2))
+            .keepAlive(true)
+            .failFast(false);
+
+        if (!isThriftMux) {
+            finagleClientBuilder = finagleClientBuilder
+                .hostConnectionLimit(2)
+                .hostConnectionCoresize(2);
+        }
+
         dlClient = DistributedLogClientBuilder.newBuilder()
                 .name("monitor")
+                .thriftmux(isThriftMux)
                 .clientId(ClientId$.MODULE$.apply("monitor"))
                 .redirectBackoffMaxMs(50)
                 .redirectBackoffStartMs(100)
@@ -310,14 +329,7 @@ public class MonitorService implements NamespaceListener {
                 .serverSets(local, remotes)
                 .streamNameRegex(streamRegex)
                 .handshakeWithClientInfo(handshakeWithClientInfo)
-                .clientBuilder(ClientBuilder.get()
-                        .connectTimeout(Duration.fromSeconds(1))
-                        .tcpConnectTimeout(Duration.fromSeconds(1))
-                        .requestTimeout(Duration.fromSeconds(2))
-                        .hostConnectionLimit(2)
-                        .hostConnectionCoresize(2)
-                        .keepAlive(true)
-                        .failFast(false))
+                .clientBuilder(finagleClientBuilder)
                 .statsReceiver(monitorReceiver.scope("client"))
                 .buildMonitorClient();
         runMonitor(dlConf, uri);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/d3a97bc0/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index 90d3566..a51a6a9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -99,6 +99,7 @@ public class MonitorServiceApp {
                 getOptionalIntegerArg(cmdline, "hck"),
                 getOptionalBooleanArg(cmdline, "hsci"),
                 getOptionalBooleanArg(cmdline, "w"),
+                getOptionalBooleanArg(cmdline, "mx"),
                 statsReceiver,
                 statsProvider);
 


[03/29] incubator-distributedlog git commit: Improve handling of lock conflicts in zk session lock

Posted by si...@apache.org.
Improve handling of lock conflicts in zk session lock

- lock reacquire could happen in foreground and background thread. so use a semaphore to make sure there is only on e outstanding  acquire operation. and  check if already hold lock before reacquire.
- fix handling zk sibling znode logic. as the znode is sequential znode, the name would be different each time. so only comparing the client id and session id of the znodes

RB_ID=833945


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/7b46a9ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/7b46a9ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/7b46a9ac

Branch: refs/heads/merge/DL-98
Commit: 7b46a9ac6bb5d520366069c244332347ef019e8e
Parents: 0091960
Author: Sijie Guo <si...@twitter.com>
Authored: Mon May 23 16:49:19 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:28:58 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/lock/ZKSessionLock.java      | 35 ++++++++++++--
 .../distributedlog/lock/TestZKSessionLock.java  | 49 ++++++++++++++++++++
 2 files changed, 80 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
index 87894dc..dc57d55 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/lock/ZKSessionLock.java
@@ -480,6 +480,31 @@ class ZKSessionLock implements SessionLock {
         return id;
     }
 
+    static boolean areLockWaitersInSameSession(String node1, String node2) {
+        String[] parts1 = node1.split("_");
+        String[] parts2 = node2.split("_");
+        if (parts1.length != 4 || parts2.length != 4) {
+            return node1.equals(node2);
+        }
+        if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
+            return node1.equals(node2);
+        }
+        long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
+        long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
+        if (sessionOwner1 != sessionOwner2) {
+            return false;
+        }
+        String clientId1, clientId2;
+        try {
+            clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
+            clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
+            return clientId1.equals(clientId2);
+        } catch (UnsupportedEncodingException e) {
+            // if failed to parse client id, we have to get client id by zookeeper#getData.
+            return node1.equals(node2);
+        }
+    }
+
     /**
      * Get client id and its ephemeral owner.
      *
@@ -1209,17 +1234,19 @@ class ZKSessionLock implements SessionLock {
             @Override
             public void execute() {
                 boolean shouldWatch;
+                final boolean shouldClaimOwnership;
                 if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
                     // if the current owner is the znode left from previous session
                     // we should watch it and claim ownership
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.",
                             new Object[] { myNode, lockPath, currentOwner });
-                } else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) {
+                } else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
                     // I found that my sibling is the current owner with same lock id (client id & session id)
                     // It must be left by any race condition from same zookeeper client
-                    // I would watch owner instead of sibling
                     shouldWatch = true;
+                    shouldClaimOwnership = true;
                     LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.",
                             new Object[]{myNode, lockPath, lockId, siblingNode});
                 } else {
@@ -1230,6 +1257,7 @@ class ZKSessionLock implements SessionLock {
                                     new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()});
                         }
                     }
+                    shouldClaimOwnership = false;
                 }
 
                 // watch sibling for lock ownership
@@ -1247,8 +1275,7 @@ class ZKSessionLock implements SessionLock {
                                     }
 
                                     if (KeeperException.Code.OK.intValue() == rc) {
-                                        if (siblingNode.equals(ownerNode) &&
-                                                (lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) {
+                                        if (shouldClaimOwnership) {
                                             // watch owner successfully
                                             LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
                                                     new Object[]{ myNode, lockPath, ownerNode });

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/7b46a9ac/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
index 629538e..054d714 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/lock/TestZKSessionLock.java
@@ -180,6 +180,28 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
     }
 
     @Test(timeout = 60000)
+    public void testAreLockWaitersInSameSession() throws Exception {
+        ZooKeeper zk = zkc.get();
+
+        String lockPath = "/test-are-lock-waiters-in-same-session";
+        String clientId1 = "test-are-lock-waiters-in-same-session-1";
+        String clientId2 = "test-are-lock-waiters-in-same-session-2";
+
+        createLockPath(zk, lockPath);
+
+        String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+        String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
+        String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
+
+        assertEquals(node1 + " and " + node3 + " should be in same session.",
+                true, areLockWaitersInSameSession(node1, node3));
+        assertEquals(node1 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node1, node2));
+        assertEquals(node3 + " and " + node2 + " should be not in same session.",
+                false, areLockWaitersInSameSession(node3, node2));
+    }
+
+    @Test(timeout = 60000)
     public void testExecuteLockAction() throws Exception {
         String lockPath = "/test-execute-lock-action";
         String clientId = "test-execute-lock-action-" + System.currentTimeMillis();
@@ -921,6 +943,33 @@ public class TestZKSessionLock extends ZooKeeperClusterTestCase {
         lock1_1.unlock();
     }
 
+    @Test(timeout = 60000)
+    public void testLockWithMultipleSiblingWaiters() throws Exception {
+        String lockPath = "/test-lock-with-multiple-sibling-waiters";
+        String clientId = "client-id";
+
+        createLockPath(zkc.get(), lockPath);
+
+        final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+        final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
+
+        lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+        lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+
+        List<String> children = awaitWaiters(3, zkc, lockPath);
+
+        assertEquals(3, children.size());
+        assertEquals(State.CLAIMED, lock0.getLockState());
+        assertEquals(State.CLAIMED, lock1.getLockState());
+        assertEquals(State.CLAIMED, lock2.getLockState());
+
+        lock0.unlock();
+        lock1.unlock();
+        lock2.unlock();
+    }
+
     /**
      * Immediate lock and unlock first lock
      * @throws Exception


[10/29] incubator-distributedlog git commit: Simplify the state transition on stream

Posted by si...@apache.org.
Simplify the state transition on stream

* the stream is created on INITIALIZING
* when the stream is started, it would start transition from INITIALIZING to INITIALIZED
* it would serve stream operations when the stream is INITIALIZED
* it would be turned to ERROR when encountered exceptions.
* the stream would be closed when service operation timeout or encountered any exceptions. it would first be removed from acquired mapping
* the stream would be removed from cached mapping depends on probation time.

    RB_ID=848047


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f19e7564
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f19e7564
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f19e7564

Branch: refs/heads/merge/DL-98
Commit: f19e7564ff4a1ec1b5d6f2683db190d739df99bb
Parents: 0a18f56
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 16:49:26 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:49:26 2016 -0800

----------------------------------------------------------------------
 .../service/DistributedLogServiceImpl.java      |  27 +-
 .../service/stream/StreamImpl.java              | 550 +++++++------------
 .../service/stream/StreamManager.java           |   5 +-
 .../service/stream/StreamManagerImpl.java       |  15 +-
 .../service/TestDistributedLogService.java      |  20 +-
 5 files changed, 222 insertions(+), 395 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 751e972..3a9b904 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -90,7 +90,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -378,7 +377,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
                     // if it is closed, we would not acquire stream again.
                     return null;
                 }
-                writer = streamManager.getOrCreateStream(stream);
+                writer = streamManager.getOrCreateStream(stream, true);
             } finally {
                 closeLock.readLock().unlock();
             }
@@ -631,26 +630,6 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         logger.info("Released KeepAlive Latch. Main thread will shut the service down.");
     }
 
-    @VisibleForTesting
-    java.util.concurrent.Future<?> schedule(Runnable runnable, long delayMs) {
-        closeLock.readLock().lock();
-        try {
-            if (serverStatus != ServerStatus.WRITE_AND_ACCEPT) {
-                return null;
-            } else if (delayMs > 0) {
-                return scheduler.schedule(runnable, delayMs, TimeUnit.MILLISECONDS);
-            } else {
-                return scheduler.submit(runnable);
-            }
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
     // Test methods.
 
     private DynamicDistributedLogConfiguration getDynConf(String streamName) {
@@ -664,8 +643,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     @VisibleForTesting
-    Stream newStream(String name) {
-        return streamFactory.create(name, getDynConf(name), streamManager);
+    Stream newStream(String name) throws IOException {
+        return streamManager.getOrCreateStream(name, false);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 1204d39..3d5b9e7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -26,7 +26,6 @@ import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.DLException;
-import com.twitter.distributedlog.exceptions.InvalidStreamNameException;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.StreamNotReadyException;
@@ -70,24 +69,23 @@ import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.Queue;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class StreamImpl implements Stream {
     static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
 
+    /**
+     * The status of the stream.
+     *
+     * The status change of the stream should just go in one direction. If a stream hits
+     * any error, the stream should be put in error state. If a stream is in error state,
+     * it should be removed and not reused anymore.
+     */
     public static enum StreamStatus {
         UNINITIALIZED(-1),
         INITIALIZING(0),
         INITIALIZED(1),
-        // if a stream is in failed state, it could be retried immediately.
-        // a stream will be put in failed state when encountered any stream exception.
-        FAILED(-2),
-        // if a stream is in backoff state, it would backoff for a while.
-        // a stream will be put in backoff state when failed to acquire the ownership.
-        BACKOFF(-3),
         CLOSING(-4),
         CLOSED(-5),
         // if a stream is in error state, it should be abort during closing.
@@ -112,26 +110,15 @@ public class StreamImpl implements Stream {
     private final Partition partition;
     private DistributedLogManager manager;
 
-    // A write has been attempted since the last stream acquire.
-    private volatile boolean writeSinceLastAcquire = false;
     private volatile AsyncLogWriter writer;
     private volatile StreamStatus status;
     private volatile String owner;
     private volatile Throwable lastException;
-    private volatile boolean running = true;
-    private volatile boolean suspended = false;
     private volatile Queue<StreamOp> pendingOps = new ArrayDeque<StreamOp>();
 
     private final Promise<Void> closePromise = new Promise<Void>();
     private final Object txnLock = new Object();
     private final TimeSequencer sequencer = new TimeSequencer();
-    // last acquire time
-    private final Stopwatch lastAcquireWatch = Stopwatch.createUnstarted();
-    // last acquire failure time
-    private final Stopwatch lastAcquireFailureWatch = Stopwatch.createUnstarted();
-    private final long nextAcquireWaitTimeMs;
-    private ScheduledFuture<?> tryAcquireScheduledFuture = null;
-    private long scheduledAcquireDelayMs = 0L;
     private final StreamRequestLimiter limiter;
     private final DynamicDistributedLogConfiguration dynConf;
     private final DistributedLogConfiguration dlConfig;
@@ -165,7 +152,7 @@ public class StreamImpl implements Stream {
         new ConcurrentHashMap<String, Counter>();
 
     // Since we may create and discard streams at initialization if there's a race,
-    // must not do any expensive intialization here (particularly any locking or
+    // must not do any expensive initialization here (particularly any locking or
     // significant resource allocation etc.).
     StreamImpl(final String name,
                final Partition partition,
@@ -189,7 +176,6 @@ public class StreamImpl implements Stream {
         this.partition = partition;
         this.status = StreamStatus.UNINITIALIZED;
         this.lastException = new IOException("Fail to write record to stream " + name);
-        this.nextAcquireWaitTimeMs = dlConfig.getZKSessionTimeoutMilliseconds() * 3 / 5;
         this.streamConfigProvider = streamConfigProvider;
         this.dlNamespace = dlNamespace;
         this.featureRateLimitDisabled = featureProvider.getFeature(
@@ -275,54 +261,16 @@ public class StreamImpl implements Stream {
         return String.format("Stream:%s, %s, %s Status:%s", name, manager, writer, status);
     }
 
-    // schedule stream acquistion
-    private void tryAcquireStreamOnce() {
-        if (!running) {
-            return;
-        }
-
-        boolean needAcquire = false;
-        boolean checkNextTime = false;
-        synchronized (this) {
-            switch (this.status) {
-            case INITIALIZING:
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case FAILED:
-                this.status = StreamStatus.INITIALIZING;
-                streamManager.notifyReleased(this);
-                needAcquire = true;
-                break;
-            case BACKOFF:
-                // We may end up here after timeout on streamLock. To avoid acquire on every timeout
-                // we should only try again if a write has been attempted since the last acquire
-                // attempt. If we end up here because the request handler woke us up, the flag will
-                // be set and we will try to acquire as intended.
-                if (writeSinceLastAcquire) {
-                    this.status = StreamStatus.INITIALIZING;
-                    streamManager.notifyReleased(this);
-                    needAcquire = true;
-                } else {
-                    checkNextTime = true;
-                }
-                break;
-            default:
-                break;
-            }
-        }
-        if (needAcquire) {
-            lastAcquireWatch.reset().start();
-            acquireStream().addEventListener(new FutureEventListener<Boolean>() {
+    @Override
+    public void start() {
+        // acquire the stream
+        acquireStream().addEventListener(new FutureEventListener<Boolean>() {
                 @Override
                 public void onSuccess(Boolean success) {
-                    synchronized (StreamImpl.this) {
-                        scheduledAcquireDelayMs = 0L;
-                        tryAcquireScheduledFuture = null;
-                    }
                     if (!success) {
-                        // schedule acquire in nextAcquireWaitTimeMs
-                        scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
+                        // failed to acquire the stream. set the stream in error status and close it.
+                        setStreamInErrorStatus();
+                        requestClose("Failed to acquire the ownership");
                     }
                 }
 
@@ -330,65 +278,40 @@ public class StreamImpl implements Stream {
                 public void onFailure(Throwable cause) {
                     // unhandled exceptions
                     logger.error("Stream {} threw unhandled exception : ", name, cause);
+                    // failed to acquire the stream. set the stream in error status and close it.
                     setStreamInErrorStatus();
                     requestClose("Unhandled exception");
                 }
             });
-        } else if (StreamStatus.isUnavailable(status)) {
-            // if the stream is unavailable, stop the thread and close the stream
-            requestClose("Stream is unavailable anymore");
-        } else if (StreamStatus.INITIALIZED != status && lastAcquireWatch.elapsed(TimeUnit.HOURS) > 2) {
-            // if the stream isn't in initialized state and no writes coming in, then close the stream
-            requestClose("Stream not used anymore");
-        } else if (checkNextTime) {
-            synchronized (StreamImpl.this) {
-                scheduledAcquireDelayMs = 0L;
-                tryAcquireScheduledFuture = null;
-            }
-            // schedule acquire in nextAcquireWaitTimeMs
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
     }
 
-    private synchronized void scheduleTryAcquireOnce(long delayMs) {
-        if (null != tryAcquireScheduledFuture) {
-            if (delayMs <= 0) {
-                if (scheduledAcquireDelayMs <= 0L ||
-                        (scheduledAcquireDelayMs > 0L
-                                && !tryAcquireScheduledFuture.cancel(false))) {
-                    return;
-                }
-                // if the scheduled one could be cancelled, re-submit one
-            } else {
-                return;
+    //
+    // Stats Operations
+    //
+
+    void countException(Throwable t, StatsLogger streamExceptionLogger) {
+        String exceptionName = null == t ? "null" : t.getClass().getName();
+        Counter counter = exceptionCounters.get(exceptionName);
+        if (null == counter) {
+            counter = exceptionStatLogger.getCounter(exceptionName);
+            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
+            if (null != oldCounter) {
+                counter = oldCounter;
             }
         }
-        tryAcquireScheduledFuture = schedule(new Runnable() {
-            @Override
-            public void run() {
-                tryAcquireStreamOnce();
-            }
-        }, delayMs);
-        scheduledAcquireDelayMs = delayMs;
+        counter.inc();
+        streamExceptionLogger.getCounter(exceptionName).inc();
     }
 
-    @Override
-    public void start() {
-        scheduleTryAcquireOnce(0);
+    boolean isCriticalException(Throwable cause) {
+        return !(cause instanceof OwnershipAcquireFailedException);
     }
 
-    ScheduledFuture<?> schedule(Runnable runnable, long delayMs) {
-        if (!running) {
-            return null;
-        }
-        try {
-            return scheduler.schedule(name, runnable, delayMs, TimeUnit.MILLISECONDS);
-        } catch (RejectedExecutionException ree) {
-            logger.error("Failed to schedule task {} in {} ms : ",
-                    new Object[] { runnable, delayMs, ree });
-            return null;
-        }
-    }
+    //
+    // Service Timeout:
+    // - schedule a timeout function to handle operation timeouts: {@link #handleServiceTimeout(String)}
+    // - if the operation is completed within timeout period, cancel the timeout.
+    //
 
     void scheduleTimeout(final StreamOp op) {
         final Timeout timeout = requestTimer.newTimeout(new TimerTask() {
@@ -418,12 +341,14 @@ public class StreamImpl implements Stream {
      * stream off the proxy for a period of time, hopefully long enough for the
      * issues to be resolved, or for whoop to kick in and kill the shard.
      */
-    synchronized void handleServiceTimeout(String reason) {
-        if (StreamStatus.isUnavailable(status)) {
-            return;
+    void handleServiceTimeout(String reason) {
+        synchronized (this) {
+            if (StreamStatus.isUnavailable(status)) {
+                return;
+            }
+            // Mark stream in error state
+            setStreamInErrorStatus();
         }
-        // Mark stream in error state
-        setStreamInErrorStatus();
 
         // Async close request, and schedule eviction when its done.
         Future<Void> closeFuture = requestClose(reason, false /* dont remove */);
@@ -436,6 +361,10 @@ public class StreamImpl implements Stream {
         });
     }
 
+    //
+    // Submit the operation to the stream.
+    //
+
     /**
      * Execute the StreamOp. If reacquire is needed, this may initiate reacquire and queue the op for
      * execution once complete.
@@ -445,9 +374,6 @@ public class StreamImpl implements Stream {
      */
     @Override
     public void submit(StreamOp op) {
-        // Let stream acquire thread know a write has been attempted.
-        writeSinceLastAcquire = true;
-
         try {
             limiter.apply(op);
         } catch (OverCapacityException ex) {
@@ -460,36 +386,28 @@ public class StreamImpl implements Stream {
             scheduleTimeout(op);
         }
 
-        boolean notifyAcquireThread = false;
         boolean completeOpNow = false;
         boolean success = true;
         if (StreamStatus.isUnavailable(status)) {
             // Stream is closed, fail the op immediately
             op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             return;
-        } if (StreamStatus.INITIALIZED == status && writer != null) {
+        } else if (StreamStatus.INITIALIZED == status && writer != null) {
             completeOpNow = true;
             success = true;
         } else {
             synchronized (this) {
                 if (StreamStatus.isUnavailable(status)) {
-                    // complete the write op as {@link #executeOp(op, success)} will handle closed case.
-                    completeOpNow = true;
-                    success = true;
+                    // Stream is closed, fail the op immediately
+                    op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
+                    return;
                 } if (StreamStatus.INITIALIZED == status) {
                     completeOpNow = true;
                     success = true;
-                } else if (StreamStatus.BACKOFF == status &&
-                        lastAcquireFailureWatch.elapsed(TimeUnit.MILLISECONDS) < nextAcquireWaitTimeMs) {
-                    completeOpNow = true;
-                    success = false;
                 } else if (failFastOnStreamNotReady) {
-                    notifyAcquireThread = true;
-                    completeOpNow = false;
-                    success = false;
                     op.fail(new StreamNotReadyException("Stream " + name + " is not ready; status = " + status));
-                } else { // closing & initializing
-                    notifyAcquireThread = true;
+                    return;
+                } else { // the stream is still initializing
                     pendingOps.add(op);
                     pendingOpsCounter.inc();
                     if (1 == pendingOps.size()) {
@@ -500,14 +418,15 @@ public class StreamImpl implements Stream {
                 }
             }
         }
-        if (notifyAcquireThread && !suspended) {
-            scheduleTryAcquireOnce(0L);
-        }
         if (completeOpNow) {
             executeOp(op, success);
         }
     }
 
+    //
+    // Execute operations and handle exceptions on operations
+    //
+
     /**
      * Execute the <i>op</i> immediately.
      *
@@ -516,20 +435,7 @@ public class StreamImpl implements Stream {
      * @param success
      *          whether the operation is success or not.
      */
-    void executeOp(StreamOp op, boolean success) {
-        closeLock.readLock().lock();
-        try {
-            if (StreamStatus.isUnavailable(status)) {
-                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
-                return;
-            }
-            doExecuteOp(op, success);
-        } finally {
-            closeLock.readLock().unlock();
-        }
-    }
-
-    private void doExecuteOp(final StreamOp op, boolean success) {
+    void executeOp(final StreamOp op, boolean success) {
         final AsyncLogWriter writer;
         final Throwable lastException;
         synchronized (this) {
@@ -552,7 +458,7 @@ public class StreamImpl implements Stream {
                         case FOUND:
                             assert(cause instanceof OwnershipAcquireFailedException);
                             countAsException = false;
-                            handleOwnershipAcquireFailedException(op, (OwnershipAcquireFailedException) cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         case ALREADY_CLOSED:
                             assert(cause instanceof AlreadyClosedException);
@@ -573,13 +479,14 @@ public class StreamImpl implements Stream {
                         case OVER_CAPACITY:
                             op.fail(cause);
                             break;
-                        // exceptions that *could* / *might* be recovered by creating a new writer
+                        // the DL writer hits exception, simple set the stream to error status
+                        // and fail the request
                         default:
-                            handleRecoverableDLException(op, cause);
+                            handleExceptionOnStreamOp(op, cause);
                             break;
                         }
                     } else {
-                        handleUnknownException(op, cause);
+                        handleExceptionOnStreamOp(op, cause);
                     }
                     if (countAsException) {
                         countException(cause, streamExceptionStatLogger);
@@ -587,88 +494,41 @@ public class StreamImpl implements Stream {
                 }
             });
         } else {
-            op.fail(lastException);
-        }
-    }
-
-    /**
-     * Handle recoverable dl exception.
-     *
-     * @param op
-     *          stream operation executing
-     * @param cause
-     *          exception received when executing <i>op</i>
-     */
-    private void handleRecoverableDLException(StreamOp op, final Throwable cause) {
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
-                statusChanged = true;
+            if (null != lastException) {
+                op.fail(lastException);
+            } else {
+                op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
             }
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
     }
 
     /**
-     * Handle unknown exception when executing <i>op</i>.
+     * Handle exception when executing <i>op</i>.
      *
      * @param op
      *          stream operation executing
      * @param cause
      *          exception received when executing <i>op</i>
      */
-    private void handleUnknownException(StreamOp op, final Throwable cause) {
+    private void handleExceptionOnStreamOp(StreamOp op, final Throwable cause) {
         AsyncLogWriter oldWriter = null;
         boolean statusChanged = false;
         synchronized (this) {
             if (StreamStatus.INITIALIZED == status) {
-                oldWriter = setStreamStatus(StreamStatus.FAILED, StreamStatus.INITIALIZED,
-                        null, null, cause);
+                oldWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, cause);
                 statusChanged = true;
             }
         }
         if (statusChanged) {
             Abortables.asyncAbort(oldWriter, false);
-            logger.error("Failed to write data into stream {} : ", name, cause);
-            scheduleTryAcquireOnce(0L);
-        }
-        op.fail(cause);
-    }
-
-    /**
-     * Handle losing ownership during executing <i>op</i>.
-     *
-     * @param op
-     *          stream operation executing
-     * @param oafe
-     *          the ownership exception received when executing <i>op</i>
-     */
-    private void handleOwnershipAcquireFailedException(StreamOp op, final OwnershipAcquireFailedException oafe) {
-        logger.warn("Failed to write data into stream {} because stream is acquired by {} : {}",
-                new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-        AsyncLogWriter oldWriter = null;
-        boolean statusChanged = false;
-        synchronized (this) {
-            if (StreamStatus.INITIALIZED == status) {
-                oldWriter =
-                    setStreamStatus(StreamStatus.BACKOFF, StreamStatus.INITIALIZED,
-                            null, oafe.getCurrentOwner(), oafe);
-                statusChanged = true;
+            if (isCriticalException(cause)) {
+                logger.error("Failed to write data into stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to write data into stream {} : {}", name, cause.getMessage());
             }
+            requestClose("Failed to write data into stream " + name + " : " + cause.getMessage());
         }
-        if (statusChanged) {
-            Abortables.asyncAbort(oldWriter, false);
-            scheduleTryAcquireOnce(nextAcquireWaitTimeMs);
-        }
-        op.fail(oafe);
+        op.fail(cause);
     }
 
     /**
@@ -680,129 +540,126 @@ public class StreamImpl implements Stream {
         fatalErrorHandler.notifyFatalError();
     }
 
-    void countException(Throwable t, StatsLogger streamExceptionLogger) {
-        String exceptionName = null == t ? "null" : t.getClass().getName();
-        Counter counter = exceptionCounters.get(exceptionName);
-        if (null == counter) {
-            counter = exceptionStatLogger.getCounter(exceptionName);
-            Counter oldCounter = exceptionCounters.putIfAbsent(exceptionName, counter);
-            if (null != oldCounter) {
-                counter = oldCounter;
-            }
-        }
-        counter.inc();
-        streamExceptionLogger.getCounter(exceptionName).inc();
-    }
+    //
+    // Acquire streams
+    //
 
     Future<Boolean> acquireStream() {
-        // Reset this flag so the acquire thread knows whether re-acquire is needed.
-        writeSinceLastAcquire = false;
-
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final Promise<Boolean> acquirePromise = new Promise<Boolean>();
         manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
 
             @Override
             public void onSuccess(AsyncLogWriter w) {
-                synchronized (txnLock) {
-                    sequencer.setLastId(w.getLastTxId());
-                }
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                synchronized (StreamImpl.this) {
-                    oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
-                            StreamStatus.INITIALIZING, w, null, null);
-                    oldPendingOps = pendingOps;
-                    pendingOps = new ArrayDeque<StreamOp>();
-                    success = true;
-                }
-                // check if the stream is allowed to be acquired
-                if (!streamManager.allowAcquire(StreamImpl.this)) {
-                    if (null != oldWriter) {
-                        Abortables.asyncAbort(oldWriter, true);
-                    }
-                    int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
-                    StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
-                            + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
-                    countException(sue, exceptionStatLogger);
-                    logger.error("Failed to acquire stream {} because it is unavailable : {}",
-                            name, sue.getMessage());
-                    synchronized (this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZED, null, null, sue);
-                        // we don't switch the pending ops since they are already switched
-                        // when setting the status to initialized
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamSuccess(w, stopwatch, acquirePromise);
             }
 
             @Override
             public void onFailure(Throwable cause) {
-                AsyncLogWriter oldWriter;
-                Queue<StreamOp> oldPendingOps;
-                boolean success;
-                if (cause instanceof AlreadyClosedException) {
-                    countException(cause, streamExceptionStatLogger);
-                    handleAlreadyClosedException((AlreadyClosedException) cause);
-                    return;
-                } else if (cause instanceof OwnershipAcquireFailedException) {
-                    OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause;
-                    logger.warn("Failed to acquire stream ownership for {}, current owner is {} : {}",
-                            new Object[]{name, oafe.getCurrentOwner(), oafe.getMessage()});
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.BACKOFF,
-                                StreamStatus.INITIALIZING, null, oafe.getCurrentOwner(), oafe);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else if (cause instanceof InvalidStreamNameException) {
-                    InvalidStreamNameException isne = (InvalidStreamNameException) cause;
-                    countException(isne, streamExceptionStatLogger);
-                    logger.error("Failed to acquire stream {} due to its name is invalid", name);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.ERROR,
-                                StreamStatus.INITIALIZING, null, null, isne);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                } else {
-                    countException(cause, streamExceptionStatLogger);
-                    logger.error("Failed to initialize stream {} : ", name, cause);
-                    synchronized (StreamImpl.this) {
-                        oldWriter = setStreamStatus(StreamStatus.FAILED,
-                                StreamStatus.INITIALIZING, null, null, cause);
-                        oldPendingOps = pendingOps;
-                        pendingOps = new ArrayDeque<StreamOp>();
-                        success = false;
-                    }
-                }
-                processPendingRequestsAfterOpen(success, oldWriter, oldPendingOps);
+                onAcquireStreamFailure(cause, stopwatch, acquirePromise);
             }
 
-            void processPendingRequestsAfterOpen(boolean success,
-                                                 AsyncLogWriter oldWriter,
-                                                 Queue<StreamOp> oldPendingOps) {
-                if (success) {
-                    streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                } else {
-                    streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
-                }
-                for (StreamOp op : oldPendingOps) {
-                    executeOp(op, success);
-                    pendingOpsCounter.dec();
-                }
-                Abortables.asyncAbort(oldWriter, true);
-                FutureUtils.setValue(acquirePromise, success);
-            }
         }, scheduler, getStreamName()));
         return acquirePromise;
     }
 
+    private void onAcquireStreamSuccess(AsyncLogWriter w,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        synchronized (txnLock) {
+            sequencer.setLastId(w.getLastTxId());
+        }
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        synchronized (StreamImpl.this) {
+            oldWriter = setStreamStatus(StreamStatus.INITIALIZED,
+                    StreamStatus.INITIALIZING, w, null);
+            oldPendingOps = pendingOps;
+            pendingOps = new ArrayDeque<StreamOp>();
+            success = true;
+        }
+        // check if the stream is allowed to be acquired
+        if (!streamManager.allowAcquire(StreamImpl.this)) {
+            if (null != oldWriter) {
+                Abortables.asyncAbort(oldWriter, true);
+            }
+            int maxAcquiredPartitions = dynConf.getMaxAcquiredPartitionsPerProxy();
+            StreamUnavailableException sue = new StreamUnavailableException("Stream " + partition.getStream()
+                    + " is not allowed to acquire more than " + maxAcquiredPartitions + " partitions");
+            countException(sue, exceptionStatLogger);
+            logger.error("Failed to acquire stream {} because it is unavailable : {}",
+                    name, sue.getMessage());
+            synchronized (this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZED, null, sue);
+                // we don't switch the pending ops since they are already switched
+                // when setting the status to initialized
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    private void onAcquireStreamFailure(Throwable cause,
+                                        Stopwatch stopwatch,
+                                        Promise<Boolean> acquirePromise) {
+        AsyncLogWriter oldWriter;
+        Queue<StreamOp> oldPendingOps;
+        boolean success;
+        if (cause instanceof AlreadyClosedException) {
+            countException(cause, streamExceptionStatLogger);
+            handleAlreadyClosedException((AlreadyClosedException) cause);
+            return;
+        } else {
+            if (isCriticalException(cause)) {
+                countException(cause, streamExceptionStatLogger);
+                logger.error("Failed to acquire stream {} : ", name, cause);
+            } else {
+                logger.warn("Failed to acquire stream {} : {}", name, cause.getMessage());
+            }
+            synchronized (StreamImpl.this) {
+                oldWriter = setStreamStatus(StreamStatus.ERROR,
+                        StreamStatus.INITIALIZING, null, cause);
+                oldPendingOps = pendingOps;
+                pendingOps = new ArrayDeque<StreamOp>();
+                success = false;
+            }
+        }
+        processPendingRequestsAfterAcquire(success, oldWriter, oldPendingOps, stopwatch, acquirePromise);
+    }
+
+    /**
+     * Process the pending request after acquired stream.
+     *
+     * @param success whether the acquisition succeed or not
+     * @param oldWriter the old writer to abort
+     * @param oldPendingOps the old pending ops to execute
+     * @param stopwatch stopwatch to measure the time spent on acquisition
+     * @param acquirePromise the promise to complete the acquire operation
+     */
+    void processPendingRequestsAfterAcquire(boolean success,
+                                            AsyncLogWriter oldWriter,
+                                            Queue<StreamOp> oldPendingOps,
+                                            Stopwatch stopwatch,
+                                            Promise<Boolean> acquirePromise) {
+        if (success) {
+            streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        } else {
+            streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
+        }
+        for (StreamOp op : oldPendingOps) {
+            executeOp(op, success);
+            pendingOpsCounter.dec();
+        }
+        Abortables.asyncAbort(oldWriter, true);
+        FutureUtils.setValue(acquirePromise, success);
+    }
+
+    //
+    // Stream Status Changes
+    //
+
     synchronized void setStreamInErrorStatus() {
         if (StreamStatus.CLOSING == status || StreamStatus.CLOSED == status) {
             return;
@@ -819,8 +676,6 @@ public class StreamImpl implements Stream {
      *          old status
      * @param writer
      *          new log writer
-     * @param owner
-     *          new owner
      * @param t
      *          new exception
      * @return old writer if it exists
@@ -828,7 +683,6 @@ public class StreamImpl implements Stream {
     synchronized AsyncLogWriter setStreamStatus(StreamStatus newStatus,
                                                 StreamStatus oldStatus,
                                                 AsyncLogWriter writer,
-                                                String owner,
                                                 Throwable t) {
         if (oldStatus != this.status) {
             logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}",
@@ -836,6 +690,11 @@ public class StreamImpl implements Stream {
             return null;
         }
 
+        String owner = null;
+        if (t instanceof OwnershipAcquireFailedException) {
+            owner = ((OwnershipAcquireFailedException) t).getCurrentOwner();
+        }
+
         AsyncLogWriter oldWriter = this.writer;
         this.writer = writer;
         if (null != owner && owner.equals(clientId)) {
@@ -852,10 +711,6 @@ public class StreamImpl implements Stream {
         }
         this.lastException = t;
         this.status = newStatus;
-        if (StreamStatus.BACKOFF == newStatus && null != owner) {
-            // start failure watch
-            this.lastAcquireFailureWatch.reset().start();
-        }
         if (StreamStatus.INITIALIZED == newStatus) {
             streamManager.notifyAcquired(this);
             logger.info("Inserted acquired stream {} -> writer {}", name, this);
@@ -866,12 +721,16 @@ public class StreamImpl implements Stream {
         return oldWriter;
     }
 
+    //
+    // Stream Close Functions
+    //
+
     void close(DistributedLogManager dlm) {
         if (null != dlm) {
             try {
                 dlm.close();
             } catch (IOException ioe) {
-                logger.warn("Failed to close dlm for {} : ", ioe);
+                logger.warn("Failed to close dlm for {} : ", name, ioe);
             }
         }
     }
@@ -902,12 +761,16 @@ public class StreamImpl implements Stream {
         // them.
         close(abort);
         if (uncache) {
+            final long probationTimeoutMs;
+            if (null != owner) {
+                probationTimeoutMs = 2 * dlConfig.getZKSessionTimeoutMilliseconds() / 3;
+            } else {
+                probationTimeoutMs = 0L;
+            }
             closePromise.onSuccess(new AbstractFunction1<Void, BoxedUnit>() {
                 @Override
                 public BoxedUnit apply(Void result) {
-                    if (streamManager.notifyRemoved(StreamImpl.this)) {
-                        logger.info("Removed cached stream {} after closed.", name);
-                    }
+                    streamManager.scheduleRemoval(StreamImpl.this, probationTimeoutMs);
                     return BoxedUnit.UNIT;
                 }
             });
@@ -949,14 +812,6 @@ public class StreamImpl implements Stream {
             closeLock.writeLock().unlock();
         }
         logger.info("Closing stream {} ...", name);
-        running = false;
-        // stop any outstanding ownership acquire actions first
-        synchronized (this) {
-            if (null != tryAcquireScheduledFuture) {
-                tryAcquireScheduledFuture.cancel(true);
-            }
-        }
-        logger.info("Stopped threads of stream {}.", name);
         // Close the writers to release the locks before failing the requests
         Future<Void> closeWriterFuture;
         if (abort) {
@@ -1016,19 +871,6 @@ public class StreamImpl implements Stream {
     // Test-only apis
 
     @VisibleForTesting
-    public StreamImpl suspendAcquiring() {
-        suspended = true;
-        return this;
-    }
-
-    @VisibleForTesting
-    public StreamImpl resumeAcquiring() {
-        suspended = false;
-        scheduleTryAcquireOnce(0L);
-        return this;
-    }
-
-    @VisibleForTesting
     public int numPendingOps() {
         Queue<StreamOp> queue = pendingOps;
         return null == queue ? 0 : queue.size();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index 972eb55..e171e46 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -43,10 +43,11 @@ public interface StreamManager {
 
     /**
      * Get a cached stream and create a new one if it doesnt exist.
-     * @param stream name
+     * @param streamName stream name
+     * @param start whether to start the stream after it is created.
      * @return future satisfied once close complete
      */
-    Stream getOrCreateStream(String stream) throws IOException;
+    Stream getOrCreateStream(String streamName, boolean start) throws IOException;
 
     /**
      * Asynchronously create a new stream.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index aa08a24..df336fe 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -33,15 +33,14 @@ import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.util.Future;
 import com.twitter.util.Promise;
-import java.io.IOException;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
@@ -233,7 +232,7 @@ public class StreamManagerImpl implements StreamManager {
     }
 
     @Override
-    public Stream getOrCreateStream(String streamName) throws IOException {
+    public Stream getOrCreateStream(String streamName, boolean start) throws IOException {
         Stream stream = streams.get(streamName);
         if (null == stream) {
             closeLock.readLock().lock();
@@ -261,7 +260,9 @@ public class StreamManagerImpl implements StreamManager {
                     numCached.getAndIncrement();
                     logger.info("Inserted mapping stream name {} -> stream {}", streamName, stream);
                     stream.initialize();
-                    stream.start();
+                    if (start) {
+                        stream.start();
+                    }
                 }
             } finally {
                 closeLock.readLock().unlock();
@@ -283,8 +284,10 @@ public class StreamManagerImpl implements StreamManager {
 
     @Override
     public void scheduleRemoval(final Stream stream, long delayMs) {
-        logger.info("Scheduling removal of stream {} from cache after {} sec.",
-            stream.getStreamName(), delayMs);
+        if (delayMs > 0) {
+            logger.info("Scheduling removal of stream {} from cache after {} sec.",
+                    stream.getStreamName(), delayMs);
+        }
         schedule(new Runnable() {
             @Override
             public void run() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f19e7564/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index 17fae4a..4195ed3 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -89,7 +89,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         dlConf.addConfiguration(conf);
         dlConf.setLockTimeout(0)
                 .setOutputBufferSize(0)
-                .setPeriodicFlushFrequencyMilliSeconds(10);
+                .setPeriodicFlushFrequencyMilliSeconds(10)
+                .setSchedulerShutdownTimeoutMs(100);
         serverConf = newLocalServerConf();
         uri = createDLMURI("/" + testName.getMethodName());
         ensureURICreated(uri);
@@ -171,10 +172,11 @@ public class TestDistributedLogService extends TestDistributedLogBase {
     public void testAcquireStreams() throws Exception {
         String streamName = testName.getMethodName();
         StreamImpl s0 = createUnstartedStream(service, streamName);
-        s0.suspendAcquiring();
-        DistributedLogServiceImpl service1 = createService(serverConf, dlConf);
+        ServerConfiguration serverConf1 = new ServerConfiguration();
+        serverConf1.addConfiguration(serverConf);
+        serverConf1.setServerPort(9999);
+        DistributedLogServiceImpl service1 = createService(serverConf1, dlConf);
         StreamImpl s1 = createUnstartedStream(service1, streamName);
-        s1.suspendAcquiring();
 
         // create write ops
         WriteOp op0 = createWriteOp(service, streamName, 0L);
@@ -190,7 +192,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
                 1, s1.numPendingOps());
 
         // start acquiring s0
-        s0.resumeAcquiring().start();
+        s0.start();
         WriteResponse wr0 = Await.result(op0.result());
         assertEquals("Op 0 should succeed",
                 StatusCode.SUCCESS, wr0.getHeader().getCode());
@@ -201,12 +203,12 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         assertNull(s0.getLastException());
 
         // start acquiring s1
-        s1.resumeAcquiring().start();
+        s1.start();
         WriteResponse wr1 = Await.result(op1.result());
         assertEquals("Op 1 should fail",
                 StatusCode.FOUND, wr1.getHeader().getCode());
-        assertEquals("Service 1 should be in BACKOFF state",
-                StreamStatus.BACKOFF, s1.getStatus());
+        assertEquals("Service 1 should be in ERROR state",
+                StreamStatus.ERROR, s1.getStatus());
         assertNotNull(s1.getManager());
         assertNull(s1.getWriter());
         assertNotNull(s1.getLastException());
@@ -727,7 +729,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         for (Stream s : streamManager.getAcquiredStreams().values()) {
             StreamImpl stream = (StreamImpl) s;
-            stream.setStatus(StreamStatus.FAILED);
+            stream.setStatus(StreamStatus.ERROR);
         }
 
         Future<List<Void>> closeResult = localService.closeStreams();


[18/29] incubator-distributedlog git commit: fix findbugs

Posted by si...@apache.org.
fix findbugs


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/76be38e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/76be38e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/76be38e5

Branch: refs/heads/merge/DL-98
Commit: 76be38e54faf5c0f0a3b4fc70cc70b602daf69e6
Parents: c4a6020
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:50:48 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:50:48 2016 -0800

----------------------------------------------------------------------
 .../java/com/twitter/distributedlog/DistributedLogConstants.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/76be38e5/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 32def94..e798a0f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,7 +58,7 @@ public class DistributedLogConstants {
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
     static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
-    public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
+    static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =


[13/29] incubator-distributedlog git commit: Make the zookeeper client used by bookkeeper client retry on session expires

Posted by si...@apache.org.
Make the zookeeper client used by bookkeeper client retry on session expires

* the zookeeper client used by bookkeeper client is purely for metadata accesses, so we should retry on session expires.
* remove the unnessary zookeeper session handling in bk log handler. as we don't necessary to fail bookkeeper client or log handler when session expires as it would be handled and retried by the zookeeper client.
* Make the retry infinitely if the retry settings for bkc zookeeper client is set to 0 or negative.

RB_ID=843057


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f18fe172
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f18fe172
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f18fe172

Branch: refs/heads/merge/DL-98
Commit: f18fe172fb4559bcabb7173d1cd0b9d27fc23c93
Parents: 72a786e
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Jul 11 10:21:44 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:58:20 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogReaderDLSN.java    | 15 +-----
 .../BKDistributedLogNamespace.java              |  5 +-
 .../twitter/distributedlog/BKLogHandler.java    | 17 -------
 .../distributedlog/BookKeeperClient.java        | 48 ++++----------------
 .../DistributedLogConfiguration.java            |  8 +++-
 5 files changed, 17 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
index 7d3d53d..b1a9273 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogReaderDLSN.java
@@ -49,7 +49,6 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Function1;
@@ -73,7 +72,7 @@ import scala.runtime.AbstractFunction1;
  * <li> `async_reader`/idle_reader_error: counter. the number idle reader errors.
  * </ul>
  */
-class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNotifier, AsyncLogReader, Runnable, AsyncNotification {
+class BKAsyncLogReaderDLSN implements AsyncLogReader, Runnable, AsyncNotification {
     static final Logger LOG = LoggerFactory.getLogger(BKAsyncLogReaderDLSN.class);
 
     private static final Function1<List<LogRecordWithDLSN>, LogRecordWithDLSN> READ_NEXT_MAP_FUNCTION =
@@ -86,7 +85,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
     protected final BKDistributedLogManager bkDistributedLogManager;
     protected final BKLogReadHandler bkLedgerManager;
-    private Watcher sessionExpireWatcher = null;
     private final AtomicReference<Throwable> lastException = new AtomicReference<Throwable>();
     private final ScheduledExecutorService executorService;
     private final ConcurrentLinkedQueue<PendingReadRequest> pendingRequests = new ConcurrentLinkedQueue<PendingReadRequest>();
@@ -218,7 +216,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.executorService = executorService;
         this.bkLedgerManager = bkDistributedLogManager.createReadHandler(subscriberId,
                 lockStateExecutor, this, deserializeRecordSet, true);
-        sessionExpireWatcher = this.bkLedgerManager.registerExpirationHandler(this);
         LOG.debug("Starting async reader at {}", startDLSN);
         this.startDLSN = startDLSN;
         this.scheduleDelayStopwatch = Stopwatch.createUnstarted();
@@ -255,14 +252,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
         this.idleReaderTimeoutTask = scheduleIdleReaderTaskIfNecessary();
     }
 
-    @Override
-    public void notifySessionExpired() {
-        // ZK Session notification is an indication to check if this has resulted in a fatal error
-        // of the underlying reader, in itself this reader doesnt error out unless the underlying
-        // reader has hit an error
-        scheduleBackgroundRead();
-    }
-
     private ScheduledFuture<?> scheduleIdleReaderTaskIfNecessary() {
         if (idleErrorThresholdMillis < Integer.MAX_VALUE) {
             // Dont run the task more than once every seconds (for sanity)
@@ -494,8 +483,6 @@ class BKAsyncLogReaderDLSN implements ZooKeeperClient.ZooKeeperSessionExpireNoti
 
         cancelAllPendingReads(exception);
 
-        bkLedgerManager.unregister(sessionExpireWatcher);
-
         FutureUtils.ignore(bkLedgerManager.asyncClose()).proxyTo(closePromise);
         return closePromise;
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 0b522d0..7a4fd7f 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -622,13 +622,10 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
                                                                   DistributedLogConfiguration conf,
                                                                   String zkServers,
                                                                   StatsLogger statsLogger) {
-        RetryPolicy retryPolicy = null;
-        if (conf.getZKNumRetries() > 0) {
-            retryPolicy = new BoundExponentialBackoffRetryPolicy(
+        RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                     conf.getBKClientZKRetryBackoffStartMillis(),
                     conf.getBKClientZKRetryBackoffMaxMillis(),
                     conf.getBKClientZKNumRetries());
-        }
         ZooKeeperClientBuilder builder = ZooKeeperClientBuilder.newBuilder()
                 .name(zkcName)
                 .sessionTimeoutMs(conf.getBKClientZKSessionTimeoutMilliSeconds())

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a6ec318..a84261a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -1293,21 +1293,4 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         }
     }
 
-    // ZooKeeper Watchers
-
-    Watcher registerExpirationHandler(final ZooKeeperClient.ZooKeeperSessionExpireNotifier onExpired) {
-        if (conf.getZKNumRetries() > 0) {
-            return new Watcher() {
-                @Override
-                public void process(WatchedEvent event) {
-                    // nop
-                }
-            };
-        }
-        return zooKeeperClient.registerExpirationHandler(onExpired);
-    }
-
-    boolean unregister(Watcher watcher) {
-        return zooKeeperClient.unregister(watcher);
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
index fd22b8f..c39ae4c 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BookKeeperClient.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog;
 
+import com.google.common.base.Optional;
 import com.twitter.distributedlog.ZooKeeperClient.Credentials;
 import com.twitter.distributedlog.ZooKeeperClient.DigestCredentials;
 import com.twitter.distributedlog.exceptions.AlreadyClosedException;
@@ -41,16 +42,12 @@ import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
 import org.apache.bookkeeper.zookeeper.RetryPolicy;
 import org.apache.commons.configuration.ConfigurationException;
 import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.Watcher;
 import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import com.google.common.base.Optional;
 
 import static com.google.common.base.Charsets.UTF_8;
 
@@ -62,7 +59,7 @@ import static com.google.common.base.Charsets.UTF_8;
  * <li> bookkeeper operation stats are exposed under current scope by {@link BookKeeper}
  * </ul>
  */
-public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireNotifier {
+public class BookKeeperClient {
     static final Logger LOG = LoggerFactory.getLogger(BookKeeperClient.class);
 
     // Parameters to build bookkeeper client
@@ -83,14 +80,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
     // feature provider
     private final Optional<FeatureProvider> featureProvider;
 
-    private Watcher sessionExpireWatcher = null;
-    private AtomicBoolean zkSessionExpired = new AtomicBoolean(false);
-
     @SuppressWarnings("deprecation")
     private synchronized void commonInitialization(
             DistributedLogConfiguration conf, String ledgersPath,
-            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer,
-            boolean registerExpirationHandler)
+            ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer)
         throws IOException, InterruptedException, KeeperException {
         ClientConfiguration bkConfig = new ClientConfiguration();
         bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout());
@@ -124,10 +117,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             .requestTimer(requestTimer)
             .featureProvider(featureProvider.orNull())
             .build();
-
-        if (registerExpirationHandler) {
-            sessionExpireWatcher = this.zkc.registerExpirationHandler(this);
-        }
     }
 
     BookKeeperClient(DistributedLogConfiguration conf,
@@ -159,16 +148,11 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         if (null != this.bkc) {
             return;
         }
-        boolean registerExpirationHandler;
         if (null == this.zkc) {
             int zkSessionTimeout = conf.getBKClientZKSessionTimeoutMilliSeconds();
-            RetryPolicy retryPolicy = null;
-            if (conf.getBKClientZKNumRetries() > 0) {
-                retryPolicy = new BoundExponentialBackoffRetryPolicy(
+            RetryPolicy retryPolicy = new BoundExponentialBackoffRetryPolicy(
                         conf.getBKClientZKRetryBackoffStartMillis(),
                         conf.getBKClientZKRetryBackoffMaxMillis(), conf.getBKClientZKNumRetries());
-            }
-
             Credentials credentials = Credentials.NONE;
             if (conf.getZkAclId() != null) {
                 credentials = new DigestCredentials(conf.getZkAclId(), conf.getZkAclId());
@@ -178,10 +162,9 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
                                            retryPolicy, statsLogger.scope("bkc_zkc"), conf.getZKClientNumberRetryThreads(),
                                            conf.getBKClientZKRequestRateLimit(), credentials);
         }
-        registerExpirationHandler = conf.getBKClientZKNumRetries() <= 0;
 
         try {
-            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer, registerExpirationHandler);
+            commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer);
         } catch (InterruptedException e) {
             throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e);
         } catch (KeeperException e) {
@@ -190,18 +173,18 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
 
         if (ownZK) {
             LOG.info("BookKeeper Client created {} with its own ZK Client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getBKClientZKNumRetries(), conf.getBKClientZKSessionTimeoutMilliSeconds(),
                     conf.getBKClientZKRetryBackoffStartMillis(), conf.getBKClientZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         } else {
             LOG.info("BookKeeper Client created {} with shared zookeeper client : ledgersPath = {}, numRetries = {}, " +
-                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}, registerExpirationHandler = {}",
+                    "sessionTimeout = {}, backoff = {}, maxBackoff = {}, dnsResolver = {}",
                     new Object[] { name, ledgersPath,
                     conf.getZKNumRetries(), conf.getZKSessionTimeoutMilliseconds(),
                     conf.getZKRetryBackoffStartMillis(), conf.getZKRetryBackoffMaxMillis(),
-                    conf.getBkDNSResolverOverrides(), registerExpirationHandler });
+                    conf.getBkDNSResolverOverrides() });
         }
     }
 
@@ -284,9 +267,6 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
             }
         }
         if (null != zkc) {
-            if (null != sessionExpireWatcher) {
-                zkc.unregister(sessionExpireWatcher);
-            }
             if (ownZK) {
                 zkc.close();
             }
@@ -294,20 +274,10 @@ public class BookKeeperClient implements ZooKeeperClient.ZooKeeperSessionExpireN
         closed = true;
     }
 
-    @Override
-    public void notifySessionExpired() {
-        zkSessionExpired.set(true);
-    }
-
     public synchronized void checkClosedOrInError() throws AlreadyClosedException {
         if (closed) {
             LOG.error("BookKeeper Client {} is already closed", name);
             throw new AlreadyClosedException("BookKeeper Client " + name + " is already closed");
         }
-
-        if (zkSessionExpired.get()) {
-            LOG.error("BookKeeper Client {}'s Zookeeper session has expired", name);
-            throw new AlreadyClosedException("BookKeeper Client " + name + "'s Zookeeper session has expired");
-        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f18fe172/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index c2057df..5d0e59a 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -809,12 +809,16 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
      * Get num of retries for zookeeper client that used by bookkeeper client.
      * <p>Retries only happen on retryable failures like session expired,
      * session moved. for permanent failures, the request will fail immediately.
-     * The default value is 3.
+     * The default value is 3. Setting it to zero or negative will retry infinitely.
      *
      * @return num of retries of zookeeper client used by bookkeeper client.
      */
     public int getBKClientZKNumRetries() {
-        return this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        int zkNumRetries = this.getInt(BKDL_BKCLIENT_ZK_NUM_RETRIES, BKDL_BKCLIENT_ZK_NUM_RETRIES_DEFAULT);
+        if (zkNumRetries <= 0) {
+            return Integer.MAX_VALUE;
+        }
+        return zkNumRetries;
     }
 
     /**


[08/29] incubator-distributedlog git commit: Don't use stack and codec together for configuring thriftmux

Posted by si...@apache.org.
Don't use stack and codec together for configuring thriftmux

- Don't use stack and codec together for configuring thriftmux
- Remove codec if the thriftmux is disabled


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/800b867b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/800b867b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/800b867b

Branch: refs/heads/merge/DL-98
Commit: 800b867b705a786094b9a591e0ebc64fa934f632
Parents: d3a97bc
Author: Dave Rusek <da...@gmail.com>
Authored: Mon Dec 12 16:41:47 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:41:47 2016 -0800

----------------------------------------------------------------------
 .../client/proxy/ProxyClient.java               | 28 ++++++++++++--------
 1 file changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/800b867b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
index 03dd3c2..4c79327 100644
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
+++ b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
@@ -75,30 +75,36 @@ public class ProxyClient {
             this.clientId = clientId;
             this.clientStats = clientStats;
             // client builder
-            ClientBuilder builder = setDefaultSettings(null == clientBuilder ? getDefaultClientBuilder() : clientBuilder);
-            if (clientConfig.getThriftMux()) {
-                builder = enableThriftMux(builder, clientId);
-            }
-            this.clientBuilder = builder;
+            ClientBuilder builder = setDefaultSettings(
+                    null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder);
+            this.clientBuilder = configureThriftMux(builder, clientId, clientConfig);
         }
 
         @SuppressWarnings("unchecked")
-        private ClientBuilder enableThriftMux(ClientBuilder builder, ClientId clientId) {
-            return builder.stack(ThriftMux.client().withClientId(clientId));
+        private ClientBuilder configureThriftMux(ClientBuilder builder,
+                                                 ClientId clientId,
+                                                 ClientConfig clientConfig) {
+            if (clientConfig.getThriftMux()) {
+                return builder.stack(ThriftMux.client().withClientId(clientId));
+            } else {
+                return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
+            }
         }
 
-        private ClientBuilder getDefaultClientBuilder() {
-            return ClientBuilder.get()
-                .hostConnectionLimit(1)
+        private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) {
+            ClientBuilder builder = ClientBuilder.get()
                 .tcpConnectTimeout(Duration.fromMilliseconds(200))
                 .connectTimeout(Duration.fromMilliseconds(200))
                 .requestTimeout(Duration.fromSeconds(1));
+            if (!clientConfig.getThriftMux()) {
+                builder = builder.hostConnectionLimit(1);
+            }
+            return builder;
         }
 
         @SuppressWarnings("unchecked")
         private ClientBuilder setDefaultSettings(ClientBuilder builder) {
             return builder.name(clientName)
-                   .codec(ThriftClientFramedCodec.apply(Option.apply(clientId)))
                    .failFast(false)
                    .noFailureAccrual()
                    // disable retries on finagle client builder, as there is only one host per finagle client


[06/29] incubator-distributedlog git commit: Remove watcher only when it registered watcher

Posted by si...@apache.org.
 Remove watcher only when it registered watcher

 (reduce the annoying logging from zookeeper client about "Failed to find watcher!"

RB_ID=837073


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/98dc9ab2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/98dc9ab2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/98dc9ab2

Branch: refs/heads/merge/DL-98
Commit: 98dc9ab2df8af7af26ca68dbe93be3d420cda417
Parents: b571d3b
Author: Sijie Guo <si...@twitter.com>
Authored: Thu May 26 17:06:31 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:37:27 2016 -0800

----------------------------------------------------------------------
 .../main/java/com/twitter/distributedlog/BKLogHandler.java    | 7 -------
 .../java/com/twitter/distributedlog/BKLogReadHandler.java     | 7 ++++++-
 .../java/com/twitter/distributedlog/BKLogWriteHandler.java    | 6 +++++-
 .../com/twitter/distributedlog/readahead/ReadAheadWorker.java | 2 +-
 .../java/com/twitter/distributedlog/zk/ZKWatcherManager.java  | 4 ++--
 .../com/twitter/distributedlog/zk/TestZKWatcherManager.java   | 2 +-
 6 files changed, 15 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 9aa3465..a6ec318 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -702,13 +702,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
     }
 
     @Override
-    public Future<Void> asyncClose() {
-        // No-op
-        this.zooKeeperClient.getWatcherManager().unregisterChildWatcher(logMetadata.getLogSegmentsPath(), this);
-        return Future.Void();
-    }
-
-    @Override
     public Future<Void> asyncAbort() {
         return asyncClose();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
index 0bf6b84..6a8f90e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogReadHandler.java
@@ -311,7 +311,12 @@ class BKLogReadHandler extends BKLogHandler {
                 if (null != handleCache) {
                     handleCache.clear();
                 }
-                return BKLogReadHandler.super.asyncClose();
+                // No-op
+                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
+                        logMetadata.getLogSegmentsPath(),
+                        BKLogReadHandler.this,
+                        true);
+                return Future.Void();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
index 573679a..4665ed5 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogWriteHandler.java
@@ -1263,7 +1263,11 @@ class BKLogWriteHandler extends BKLogHandler {
         ).flatMap(new AbstractFunction1<Void, Future<Void>>() {
             @Override
             public Future<Void> apply(Void result) {
-                return BKLogWriteHandler.super.asyncClose();
+                zooKeeperClient.getWatcherManager().unregisterChildWatcher(
+                        logMetadata.getLogSegmentsPath(),
+                        BKLogWriteHandler.this,
+                        false);
+                return Future.Void();
             }
         });
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
index a3fd239..5c15009 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/readahead/ReadAheadWorker.java
@@ -359,7 +359,7 @@ public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, As
         running = false;
 
         this.zkc.getWatcherManager()
-                .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this);
+                .unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this, true);
 
         // Aside from unfortunate naming of variables, this allows
         // the currently active long poll to be interrupted and completed

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
index a24b560..03b2841 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
@@ -139,7 +139,7 @@ public class ZKWatcherManager implements Watcher {
         return this;
     }
 
-    public void unregisterChildWatcher(String path, Watcher watcher) {
+    public void unregisterChildWatcher(String path, Watcher watcher, boolean removeFromServer) {
         Set<Watcher> watchers = childWatches.get(path);
         if (null == watchers) {
             logger.warn("No watchers found on path {} while unregistering child watcher {}.",
@@ -155,7 +155,7 @@ public class ZKWatcherManager implements Watcher {
             if (watchers.isEmpty()) {
                 // best-efforts to remove watches
                 try {
-                    if (null != zkc) {
+                    if (null != zkc && removeFromServer) {
                         zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() {
                             @Override
                             public void processResult(int rc, String path, Object ctx) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/98dc9ab2/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
index 6f269c3..3ad181d 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
@@ -72,7 +72,7 @@ public class TestZKWatcherManager {
         assertEquals(event2, events.get(1));
 
         // unregister watcher
-        watcherManager.unregisterChildWatcher(path, watcher);
+        watcherManager.unregisterChildWatcher(path, watcher, true);
 
         assertEquals(0, watcherManager.childWatches.size());
     }


[17/29] incubator-distributedlog git commit: Merge remote-tracking branch 'apache/master' into merge/DL-90

Posted by si...@apache.org.
Merge remote-tracking branch 'apache/master' into merge/DL-90


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/c4a6020c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/c4a6020c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/c4a6020c

Branch: refs/heads/merge/DL-98
Commit: c4a6020c39d3a91473a3dad4b687320deaf7bc63
Parents: 800b867 132dadc
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:35:36 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:35:36 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 28 files changed, 361 insertions(+), 274 deletions(-)
----------------------------------------------------------------------



[27/29] incubator-distributedlog git commit: Remove guage when the log segment writer is removed

Posted by si...@apache.org.
Remove guage when the log segment writer is removed

RB_ID=849452


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9709f9f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9709f9f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9709f9f9

Branch: refs/heads/merge/DL-98
Commit: 9709f9f9c2ad205cd2ebef765688b0b3b3a9a1cc
Parents: f95a0a8
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Jul 11 11:11:15 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 20 16:26:50 2016 -0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[04/29] incubator-distributedlog git commit: Introduce periodic keepalive control record in writer

Posted by si...@apache.org.
Introduce periodic keepalive control record in writer

    * so the writer will periodically write 'keepalive' control record to make sure the stream is alive. so if the write proxy is disconnected from bookies, the control record will fail to write hence write proxy gets a chance to drop the ownership.


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/517c77c1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/517c77c1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/517c77c1

Branch: refs/heads/merge/DL-98
Commit: 517c77c164cb989ae9829cbd80bf2e492eb8e364
Parents: 7b46a9a
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 16:33:33 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:33:33 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogSegmentWriter.java      | 47 ++++++++++++++++-
 .../DistributedLogConfiguration.java            | 25 +++++++++
 .../distributedlog/DistributedLogConstants.java |  1 +
 .../distributedlog/TestAsyncReaderWriter.java   | 53 ++++++++++++++++++++
 4 files changed, 125 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 004b2fb..1b52951 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -72,6 +72,7 @@ import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
@@ -128,6 +129,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private long numFlushesSinceRestart = 0;
     private long numBytes = 0;
     private long lastEntryId = Long.MIN_VALUE;
+    private long lastTransmitNanos = Long.MIN_VALUE;
+    private final int periodicKeepAliveMs;
 
     // Indicates whether there are writes that have been successfully transmitted that would need
     // a control record to be transmitted to make them visible to the readers by updating the last
@@ -137,7 +140,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private int minDelayBetweenImmediateFlushMs = 0;
     private Stopwatch lastTransmit;
     private boolean streamEnded = false;
-    private ScheduledFuture<?> periodicFlushSchedule = null;
+    private final ScheduledFuture<?> periodicFlushSchedule;
+    private final ScheduledFuture<?> periodicKeepAliveSchedule;
     final private AtomicReference<ScheduledFuture<?>> transmitSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<ScheduledFuture<?>> immFlushSchedFutureRef = new AtomicReference<ScheduledFuture<?>>(null);
     final private AtomicReference<Exception> scheduledFlushException = new AtomicReference<Exception>(null);
@@ -312,11 +316,25 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             if (periodicFlushFrequency > 0 && scheduler != null) {
                 periodicFlushSchedule = scheduler.scheduleAtFixedRate(this,
                         periodicFlushFrequency/2, periodicFlushFrequency/2, TimeUnit.MILLISECONDS);
+            } else {
+                periodicFlushSchedule = null;
             }
         } else {
             // Min delay heuristic applies only when immediate flush is enabled
             // and transmission threshold is zero
             minDelayBetweenImmediateFlushMs = conf.getMinDelayBetweenImmediateFlushMs();
+            periodicFlushSchedule = null;
+        }
+        this.periodicKeepAliveMs = conf.getPeriodicKeepAliveMilliSeconds();
+        if (periodicKeepAliveMs > 0 && scheduler != null) {
+            periodicKeepAliveSchedule = scheduler.scheduleAtFixedRate(new Runnable() {
+                @Override
+                public void run() {
+                    keepAlive();
+                }
+            }, periodicKeepAliveMs, periodicKeepAliveMs, TimeUnit.MILLISECONDS);
+        } else {
+            periodicKeepAliveSchedule = null;
         }
 
         this.conf = conf;
@@ -513,6 +531,13 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // Cancel the periodic keep alive schedule first
+        if (null != periodicKeepAliveSchedule) {
+            if (!periodicKeepAliveSchedule.cancel(false)) {
+                LOG.info("Periodic keepalive for log segment {} isn't cancelled.", getFullyQualifiedLogSegment());
+            }
+        }
+
         // Cancel the periodic flush schedule first
         // The task is allowed to exit gracefully
         if (null != periodicFlushSchedule) {
@@ -1079,6 +1104,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             }
 
             synchronized (this) {
+                // update the transmit timestamp
+                lastTransmitNanos = MathUtils.nowInNano();
+
                 BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
                 packetPrevious = packet;
                 entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
@@ -1293,4 +1321,21 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
         }
     }
 
+    synchronized private void keepAlive() {
+        if (null != closeFuture) {
+            // if the log segment is closing, skip sending any keep alive records.
+            LOG.debug("Skip sending keepAlive control record since log segment {} is closing.",
+                    getFullyQualifiedLogSegment());
+            return;
+        }
+
+        if (MathUtils.elapsedMSec(lastTransmitNanos) < periodicKeepAliveMs) {
+            return;
+        }
+
+        LogRecord controlRec = new LogRecord(lastTxId, DistributedLogConstants.KEEPALIVE_RECORD_CONTENT);
+        controlRec.setControl();
+        asyncWrite(controlRec);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
index d2af862..c2057df 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConfiguration.java
@@ -249,6 +249,8 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
     public static final boolean BKDL_ENABLE_IMMEDIATE_FLUSH_DEFAULT = false;
     public static final String BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS = "minimumDelayBetweenImmediateFlushMilliSeconds";
     public static final int BKDL_MINIMUM_DELAY_BETWEEN_IMMEDIATE_FLUSH_MILLISECONDS_DEFAULT = 0;
+    public static final String BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS = "periodicKeepAliveMilliSeconds";
+    public static final int BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT = 0;
 
     // Retention/Truncation Settings
     public static final String BKDL_RETENTION_PERIOD_IN_HOURS = "logSegmentRetentionHours";
@@ -1893,6 +1895,29 @@ public class DistributedLogConfiguration extends CompositeConfiguration {
         return this;
     }
 
+    /**
+     * Get Periodic Keep Alive Frequency in milliseconds.
+     * <p>If the setting is set with a positive value, it would periodically write a control record
+     * to keep the stream active. The default value is 0.
+     *
+     * @return periodic keep alive frequency in milliseconds.
+     */
+    public int getPeriodicKeepAliveMilliSeconds() {
+        return this.getInt(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS_DEFAULT);
+    }
+
+    /**
+     * Set Periodic Keep Alive Frequency in milliseconds.
+     *
+     * @param keepAliveMs keep alive frequency in milliseconds.
+     * @return distributedlog configuration
+     * @see #getPeriodicKeepAliveMilliSeconds()
+     */
+    public DistributedLogConfiguration setPeriodicKeepAliveMilliSeconds(int keepAliveMs) {
+        setProperty(BKDL_PERIODIC_KEEP_ALIVE_MILLISECONDS, keepAliveMs);
+        return this;
+    }
+
     //
     // DL Retention/Truncation Settings
     //

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
index 5c50282..32def94 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/DistributedLogConstants.java
@@ -58,6 +58,7 @@ public class DistributedLogConstants {
     public static final String COMPLETED_LOGSEGMENT_PREFIX = "logrecs";
     public static final String DISALLOW_PLACEMENT_IN_REGION_FEATURE_NAME = "disallow_bookie_placement";
     static final byte[] CONTROL_RECORD_CONTENT = "control".getBytes(UTF_8);
+    public static final byte[] KEEPALIVE_RECORD_CONTENT = "keepalive".getBytes(UTF_8);
 
     // An ACL that gives all permissions to node creators and read permissions only to everyone else.
     public static final List<ACL> EVERYONE_READ_CREATOR_ALL =

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/517c77c1/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
index 8011a04..a4832b0 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java
@@ -2115,4 +2115,57 @@ public class TestAsyncReaderWriter extends TestDistributedLogBase {
             }
         }
     }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsDisabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(0);
+        confLocal.setReaderIdleWarnThresholdMillis(20);
+        confLocal.setReaderIdleErrorThresholdMillis(40);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        try {
+            FutureUtils.result(reader.readNext());
+            fail("Should fail when stream is idle");
+        } catch (IdleReaderException ire) {
+            // expected
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testIdleReaderExceptionWhenKeepAliveIsEnabled() throws Exception {
+        String name = runtime.getMethodName();
+        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
+        confLocal.addConfiguration(testConf);
+        confLocal.setOutputBufferSize(0);
+        confLocal.setImmediateFlushEnabled(false);
+        confLocal.setPeriodicFlushFrequencyMilliSeconds(0);
+        confLocal.setPeriodicKeepAliveMilliSeconds(1000);
+        confLocal.setReaderIdleWarnThresholdMillis(2000);
+        confLocal.setReaderIdleErrorThresholdMillis(4000);
+
+        URI uri = createDLMURI("/" + name);
+        ensureURICreated(uri);
+
+        DistributedLogManager dlm = createNewDLM(confLocal, name);
+        BKAsyncLogWriter writer = (BKAsyncLogWriter) FutureUtils.result(dlm.openAsyncLogWriter());
+        writer.write(DLMTestUtil.getLogRecordInstance(1L));
+
+        AsyncLogReader reader = FutureUtils.result(dlm.openAsyncLogReader(DLSN.InitialDLSN));
+        LogRecordWithDLSN record = FutureUtils.result(reader.readNext());
+        assertEquals(1L, record.getTransactionId());
+        DLMTestUtil.verifyLogRecord(record);
+    }
 }


[20/29] incubator-distributedlog git commit: Merge branch 'merge/DL-91' into merge/DL-92

Posted by si...@apache.org.
Merge branch 'merge/DL-91' into merge/DL-92


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/adc8ad19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/adc8ad19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/adc8ad19

Branch: refs/heads/merge/DL-98
Commit: adc8ad198d59a05dc623818c0948f8f219ce0a8b
Parents: f19e756 4fc6f3d
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:51:41 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:51:41 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[14/29] incubator-distributedlog git commit: Assign host ip client id if the client id is undefined

Posted by si...@apache.org.
Assign host ip client id if the client id is undefined

    RB_ID=843132


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f4f633fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f4f633fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f4f633fe

Branch: refs/heads/merge/DL-98
Commit: f4f633fe655e7146d5aad89c24f7e7107ab65ab2
Parents: f18fe17
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 16:59:38 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:59:38 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKDistributedLogNamespace.java   | 15 ++++++++++++++-
 .../com/twitter/distributedlog/BKLogHandler.java    | 16 +---------------
 2 files changed, 15 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f4f633fe/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
index 7a4fd7f..2df1046 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogNamespace.java
@@ -75,6 +75,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.net.URI;
 import java.util.Collection;
 import java.util.HashMap;
@@ -252,6 +253,14 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         }
     }
 
+    private static String getHostIpLockClientId() {
+        try {
+            return InetAddress.getLocalHost().toString();
+        } catch(Exception ex) {
+            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
+        }
+    }
+
     private final String clientId;
     private final int regionId;
     private final DistributedLogConfiguration conf;
@@ -326,9 +335,13 @@ public class BKDistributedLogNamespace implements DistributedLogNamespace {
         this.featureProvider = featureProvider;
         this.statsLogger = statsLogger;
         this.perLogStatsLogger = perLogStatsLogger;
-        this.clientId = clientId;
         this.regionId = regionId;
         this.bkdlConfig = bkdlConfig;
+        if (clientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
+            this.clientId = getHostIpLockClientId();
+        } else {
+            this.clientId = clientId;
+        }
 
         // Build resources
         StatsLogger schedulerStatsLogger = statsLogger.scope("factory").scope("thread_pool");

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/f4f633fe/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index a84261a..3b991e2 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -56,7 +56,6 @@ import scala.runtime.AbstractFunction0;
 import scala.runtime.BoxedUnit;
 
 import java.io.IOException;
-import java.net.InetAddress;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -274,12 +273,7 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         LOG.debug("Using ZK Path {}", logMetadata.getLogRootPath());
         this.bookKeeperClient = bkcBuilder.build();
         this.metadataStore = metadataStore;
-
-        if (lockClientId.equals(DistributedLogConstants.UNKNOWN_CLIENT_ID)) {
-            this.lockClientId = getHostIpLockClientId();
-        } else {
-            this.lockClientId = lockClientId;
-        }
+        this.lockClientId = lockClientId;
 
         this.getChildrenWatcher = this.zooKeeperClient.getWatcherManager()
                 .registerChildWatcher(logMetadata.getLogSegmentsPath(), this);
@@ -316,14 +310,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return lockClientId;
     }
 
-    private String getHostIpLockClientId() {
-        try {
-            return InetAddress.getLocalHost().toString();
-        } catch(Exception ex) {
-            return DistributedLogConstants.UNKNOWN_CLIENT_ID;
-        }
-    }
-
     protected void registerListener(LogSegmentListener listener) {
         listeners.add(listener);
     }


[09/29] incubator-distributedlog git commit: Be able to close the writer within a timeout period

Posted by si...@apache.org.
Be able to close the writer within a timeout period

RB_ID=841340


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/0a18f564
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/0a18f564
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/0a18f564

Branch: refs/heads/merge/DL-98
Commit: 0a18f564ba7e4d03f4c6a0859e5478535a59befd
Parents: 800b867
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 16:46:11 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:46:11 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKAsyncLogWriter.java        | 33 +++++++++++++-------
 .../distributedlog/util/FutureUtils.java        | 20 +++++++++++-
 .../service/config/ServerConfiguration.java     | 25 +++++++++++++++
 .../service/stream/StreamFactoryImpl.java       |  6 +++-
 .../service/stream/StreamImpl.java              | 32 +++++++++++++++++--
 5 files changed, 99 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
index f1594f9..79f5f5e 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKAsyncLogWriter.java
@@ -212,8 +212,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri
                                                            boolean rollLog,
                                                            boolean allowMaxTxID) {
         Stopwatch stopwatch = Stopwatch.createStarted();
-        return doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID)
-                .addEventListener(new OpStatsListener<BKLogSegmentWriter>(getWriterOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                doGetLogSegmentWriter(firstTxid, bestEffort, rollLog, allowMaxTxID),
+                getWriterOpStatsLogger,
+                stopwatch);
     }
 
     private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long firstTxid,
@@ -415,8 +417,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri
     @Override
     public Future<DLSN> write(final LogRecord record) {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        return asyncWrite(record, true)
-                .addEventListener(new OpStatsListener<DLSN>(writeOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                asyncWrite(record, true),
+                writeOpStatsLogger,
+                stopwatch);
     }
 
     /**
@@ -430,8 +434,10 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri
     @Override
     public Future<List<Future<DLSN>>> writeBulk(final List<LogRecord> records) {
         final Stopwatch stopwatch = Stopwatch.createStarted();
-        return Future.value(asyncWriteBulk(records))
-                .addEventListener(new OpStatsListener<List<Future<DLSN>>>(bulkWriteOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                Future.value(asyncWriteBulk(records)),
+                bulkWriteOpStatsLogger,
+                stopwatch);
     }
 
     @Override
@@ -478,12 +484,15 @@ public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWri
             logSegmentWriterFuture = getLogSegmentWriterForEndOfStream();
         }
 
-        return logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
-            @Override
-            public Future<Long> apply(BKLogSegmentWriter w) {
-                return w.markEndOfStream();
-            }
-        }).addEventListener(new OpStatsListener<Long>(markEndOfStreamOpStatsLogger, stopwatch));
+        return FutureUtils.stats(
+                logSegmentWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() {
+                    @Override
+                    public Future<Long> apply(BKLogSegmentWriter w) {
+                        return w.markEndOfStream();
+                    }
+                }),
+                markEndOfStreamOpStatsLogger,
+                stopwatch);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
index f0540d7..6a5f7a7 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/util/FutureUtils.java
@@ -17,13 +17,15 @@
  */
 package com.twitter.distributedlog.util;
 
-import com.twitter.distributedlog.exceptions.BKTransmitException;
+import com.google.common.base.Stopwatch;
 import com.twitter.distributedlog.DistributedLogConstants;
+import com.twitter.distributedlog.exceptions.BKTransmitException;
 import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.exceptions.DLInterruptedException;
 import com.twitter.distributedlog.exceptions.UnexpectedException;
 import com.twitter.distributedlog.exceptions.ZKException;
+import com.twitter.distributedlog.stats.OpStatsListener;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Function;
@@ -34,6 +36,7 @@ import com.twitter.util.Promise;
 import com.twitter.util.Return;
 import com.twitter.util.Throw;
 import org.apache.bookkeeper.client.BKException;
+import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.zookeeper.KeeperException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -225,6 +228,21 @@ public class FutureUtils {
     }
 
     /**
+     * Add a event listener over <i>result</i> for collecting the operation stats.
+     *
+     * @param result result to listen on
+     * @param opStatsLogger stats logger to record operations stats
+     * @param stopwatch stop watch to time operation
+     * @param <T>
+     * @return result after registered the event listener
+     */
+    public static <T> Future<T> stats(Future<T> result,
+                                      OpStatsLogger opStatsLogger,
+                                      Stopwatch stopwatch) {
+        return result.addEventListener(new OpStatsListener<T>(opStatsLogger, stopwatch));
+    }
+
+    /**
      * Await for the result of the future and thrown bk related exceptions.
      *
      * @param result future to wait for

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 90ff6e6..09661c0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -79,6 +79,10 @@ public class ServerConfiguration extends CompositeConfiguration {
     public static final String SERVER_SERVICE_TIMEOUT_MS_OLD = "serviceTimeoutMs";
     public static final long SERVER_SERVICE_TIMEOUT_MS_DEFAULT = 0;
 
+    // Server close writer timeout
+    public static final String SERVER_WRITER_CLOSE_TIMEOUT_MS = "server_writer_close_timeout_ms";
+    public static final long SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT = 1000;
+
     // Server stream probation timeout
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
@@ -296,6 +300,27 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
+     * Get timeout for closing writer in proxy layer. 0 disables timeout.
+     *
+     * @return timeout for closing writer in proxy layer.
+     */
+    public long getWriterCloseTimeoutMs() {
+        return getLong(SERVER_WRITER_CLOSE_TIMEOUT_MS, SERVER_WRITER_CLOSE_TIMEOUT_MS_DEFAULT);
+    }
+
+    /**
+     * Set timeout for closing writer in proxy layer. 0 disables timeout.
+     *
+     * @param timeoutMs
+     *          timeout for closing writer in proxy layer.
+     * @return dl configuration.
+     */
+    public ServerConfiguration setWriterCloseTimeoutMs(long timeoutMs) {
+        setProperty(SERVER_WRITER_CLOSE_TIMEOUT_MS, timeoutMs);
+        return this;
+    }
+
+    /**
      * After service timeout, how long should stream be kept in cache in probationary state in order
      * to prevent reacquire. In millisec.
      *

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
index bc53fe3..cb28f1e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
@@ -25,6 +25,7 @@ import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.util.OrderedScheduler;
+import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.jboss.netty.util.HashedWheelTimer;
 
@@ -40,6 +41,7 @@ public class StreamFactoryImpl implements StreamFactory {
     private final OrderedScheduler scheduler;
     private final FatalErrorHandler fatalErrorHandler;
     private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
 
     public StreamFactoryImpl(String clientId,
         StreamOpStats streamOpStats,
@@ -64,6 +66,7 @@ public class StreamFactoryImpl implements StreamFactory {
         this.scheduler = scheduler;
         this.fatalErrorHandler = fatalErrorHandler;
         this.requestTimer = requestTimer;
+        this.futureTimer = new com.twitter.finagle.util.HashedWheelTimer(requestTimer);
     }
 
     @Override
@@ -83,6 +86,7 @@ public class StreamFactoryImpl implements StreamFactory {
             dlNamespace,
             scheduler,
             fatalErrorHandler,
-            requestTimer);
+            requestTimer,
+            futureTimer);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/0a18f564/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 45630fe..1204d39 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -45,10 +45,13 @@ import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
 import com.twitter.distributedlog.util.TimeSequencer;
 import com.twitter.distributedlog.util.Utils;
+import com.twitter.util.Duration;
 import com.twitter.util.Function0;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
+import com.twitter.util.TimeoutException;
+import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.Counter;
@@ -141,9 +144,11 @@ public class StreamImpl implements Stream {
     private final StreamConfigProvider streamConfigProvider;
     private final FatalErrorHandler fatalErrorHandler;
     private final long streamProbationTimeoutMs;
-    private long serviceTimeoutMs;
+    private final long serviceTimeoutMs;
+    private final long writerCloseTimeoutMs;
     private final boolean failFastOnStreamNotReady;
     private final HashedWheelTimer requestTimer;
+    private final Timer futureTimer;
 
     // Stats
     private final StatsLogger streamLogger;
@@ -151,8 +156,10 @@ public class StreamImpl implements Stream {
     private final StatsLogger limiterStatLogger;
     private final Counter serviceTimeout;
     private final OpStatsLogger streamAcquireStat;
+    private final OpStatsLogger writerCloseStatLogger;
     private final Counter pendingOpsCounter;
     private final Counter unexpectedExceptions;
+    private final Counter writerCloseTimeoutCounter;
     private final StatsLogger exceptionStatLogger;
     private final ConcurrentHashMap<String, Counter> exceptionCounters =
         new ConcurrentHashMap<String, Counter>();
@@ -173,7 +180,8 @@ public class StreamImpl implements Stream {
                DistributedLogNamespace dlNamespace,
                OrderedScheduler scheduler,
                FatalErrorHandler fatalErrorHandler,
-               HashedWheelTimer requestTimer) {
+               HashedWheelTimer requestTimer,
+               Timer futureTimer) {
         this.clientId = clientId;
         this.dlConfig = dlConfig;
         this.streamManager = streamManager;
@@ -189,6 +197,7 @@ public class StreamImpl implements Stream {
         this.scheduler = scheduler;
         this.serviceTimeoutMs = serverConfig.getServiceTimeoutMs();
         this.streamProbationTimeoutMs = serverConfig.getStreamProbationTimeoutMs();
+        this.writerCloseTimeoutMs = serverConfig.getWriterCloseTimeoutMs();
         this.failFastOnStreamNotReady = dlConfig.getFailFastOnStreamNotReady();
         this.fatalErrorHandler = fatalErrorHandler;
         this.dynConf = streamConf;
@@ -197,6 +206,7 @@ public class StreamImpl implements Stream {
             streamOpStats.streamRequestScope(partition, "limiter"));
         this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
         this.requestTimer = requestTimer;
+        this.futureTimer = futureTimer;
 
         // Stats
         this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
@@ -208,6 +218,8 @@ public class StreamImpl implements Stream {
         this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
         this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions");
         this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
+        this.writerCloseStatLogger = streamsStatsLogger.getOpStatsLogger("writer_close");
+        this.writerCloseTimeoutCounter = streamsStatsLogger.getCounter("writer_close_timeouts");
     }
 
     @Override
@@ -953,7 +965,18 @@ public class StreamImpl implements Stream {
             closeWriterFuture = Utils.asyncClose(writer, true);
         }
         // close the manager and error out pending requests after close writer
-        closeWriterFuture.addEventListener(FutureUtils.OrderedFutureEventListener.of(
+        Duration closeWaitDuration;
+        if (writerCloseTimeoutMs <= 0) {
+            closeWaitDuration = Duration.Top();
+        } else {
+            closeWaitDuration = Duration.fromMilliseconds(writerCloseTimeoutMs);
+        }
+        FutureUtils.stats(
+                closeWriterFuture,
+                writerCloseStatLogger,
+                Stopwatch.createStarted()
+        ).masked().within(futureTimer, closeWaitDuration)
+                .addEventListener(FutureUtils.OrderedFutureEventListener.of(
                 new FutureEventListener<Void>() {
                     @Override
                     public void onSuccess(Void value) {
@@ -962,6 +985,9 @@ public class StreamImpl implements Stream {
                     }
                     @Override
                     public void onFailure(Throwable cause) {
+                        if (cause instanceof TimeoutException) {
+                            writerCloseTimeoutCounter.inc();
+                        }
                         closeManagerAndErrorOutPendingRequests();
                         FutureUtils.setValue(closePromise, null);
                     }


[16/29] incubator-distributedlog git commit: Remove guage when the log segment writer is removed

Posted by si...@apache.org.
Remove guage when the log segment writer is removed

RB_ID=849452


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/bc70b770
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/bc70b770
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/bc70b770

Branch: refs/heads/merge/DL-98
Commit: bc70b7706e2a5cf2af26ab7e92ecf5ec2b1a955e
Parents: b7ae590
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Jul 11 11:11:15 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 18:27:37 2016 -0800

----------------------------------------------------------------------
 .../distributedlog/BKLogSegmentWriter.java      | 12 ++++++++---
 .../stats/BroadCastStatsLogger.java             | 22 ++++++++++++++++++++
 pom.xml                                         |  2 +-
 3 files changed, 32 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bc70b770/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
index 1b52951..8276125 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogSegmentWriter.java
@@ -156,8 +156,10 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
 
     // stats
     private final StatsLogger envelopeStatsLogger;
+    private final StatsLogger transmitOutstandingLogger;
     private final Counter transmitDataSuccesses;
     private final Counter transmitDataMisses;
+    private final Gauge<Number> transmitOutstandingGauge;
     private final OpStatsLogger transmitDataPacketSize;
     private final Counter transmitControlSuccesses;
     private final Counter pFlushSuccesses;
@@ -255,8 +257,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
         pendingWrites = segWriterStatsLogger.getCounter("pending");
 
         // outstanding transmit requests
-        StatsLogger transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
-        transmitOutstandingLogger.registerGauge("requests", new Gauge<Number>() {
+        transmitOutstandingLogger = perLogStatsLogger.scope("transmit").scope("outstanding");
+        transmitOutstandingGauge = new Gauge<Number>() {
             @Override
             public Number getDefaultValue() {
                 return 0;
@@ -265,7 +267,8 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
             public Number getSample() {
                 return outstandingTransmits.get();
             }
-        });
+        };
+        transmitOutstandingLogger.registerGauge("requests", transmitOutstandingGauge);
 
         outstandingTransmits = new AtomicInteger(0);
         this.fullyQualifiedLogSegment = streamName + ":" + logSegmentName;
@@ -531,6 +534,9 @@ class BKLogSegmentWriter implements LogSegmentWriter, AddCallback, Runnable, Siz
     private void closeInternal(final boolean abort,
                                final AtomicReference<Throwable> throwExc,
                                final Promise<Void> closePromise) {
+        // remove stats
+        this.transmitOutstandingLogger.unregisterGauge("requests", transmitOutstandingGauge);
+
         // Cancel the periodic keep alive schedule first
         if (null != periodicKeepAliveSchedule) {
             if (!periodicKeepAliveSchedule.cancel(false)) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bc70b770/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
index e29cc47..10a7011 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/stats/BroadCastStatsLogger.java
@@ -133,9 +133,26 @@ public class BroadCastStatsLogger {
         }
 
         @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            // no-op
+        }
+
+        @Override
         public StatsLogger scope(final String scope) {
             return new Two(first.scope(scope), second.scope(scope));
         }
+
+        @Override
+        public void removeScope(String scope, StatsLogger statsLogger) {
+            if (!(statsLogger instanceof Two)) {
+                return;
+            }
+
+            Two another = (Two) statsLogger;
+
+            first.removeScope(scope, another.first);
+            second.removeScope(scope, another.second);
+        }
     }
 
     /**
@@ -165,6 +182,11 @@ public class BroadCastStatsLogger {
         }
 
         @Override
+        public <T extends Number> void unregisterGauge(String statName, Gauge<T> gauge) {
+            first.unregisterGauge(statName, gauge);
+        }
+
+        @Override
         public StatsLogger scope(String scope) {
             return new MasterSlave(first.scope(scope), second.scope(scope));
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/bc70b770/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index db07f12..1c70aa6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -87,7 +87,7 @@
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
     <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
     <zookeeper.version>3.5.1-alpha</zookeeper.version>
-    <bookkeeper.version>4.3.5-TWTTR-OSS</bookkeeper.version>
+    <bookkeeper.version>4.3.6-TWTTR-OSS</bookkeeper.version>
     <birdcage.sha>6.34.0</birdcage.sha>
     <scrooge.version>4.6.0</scrooge.version>
     <scrooge-maven-plugin.version>3.17.0</scrooge-maven-plugin.version>


[29/29] incubator-distributedlog git commit: Merge branch 'merge/DL-97' into merge/DL-98

Posted by si...@apache.org.
Merge branch 'merge/DL-97' into merge/DL-98


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/28a8b8ff
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/28a8b8ff
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/28a8b8ff

Branch: refs/heads/merge/DL-98
Commit: 28a8b8ff9a01d2862fcedb37bc0f6ce1dc3e1a19
Parents: 9709f9f 83a4e09
Author: Sijie Guo <si...@twitter.com>
Authored: Tue Dec 20 17:04:32 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 20 17:04:32 2016 -0800

----------------------------------------------------------------------
 dev-support/DL_formatter.xml                    | 310 -------------
 distributedlog-build-tools/pom.xml              |  30 ++
 .../resources/distributedlog/checkstyle.xml     | 443 +++++++++++++++++++
 .../resources/distributedlog/suppressions.xml   |  29 ++
 .../main/resources/ide/eclipse/DL_formatter.xml | 310 +++++++++++++
 distributedlog-protocol/pom.xml                 |  33 ++
 .../java/com/twitter/distributedlog/DLSN.java   |  67 +--
 .../EnvelopedRecordSetReader.java               |  14 +-
 .../EnvelopedRecordSetWriter.java               |  25 +-
 .../com/twitter/distributedlog/LogRecord.java   |  75 ++--
 .../twitter/distributedlog/LogRecordSet.java    |  23 +-
 .../distributedlog/LogRecordWithDLSN.java       |  26 +-
 .../twitter/distributedlog/RecordStream.java    |   6 +-
 .../annotations/DistributedLogAnnotations.java  |   3 +
 .../annotations/package-info.java               |  21 +
 .../exceptions/AlreadyClosedException.java      |   6 +-
 .../AlreadyTruncatedTransactionException.java   |   4 +-
 .../exceptions/BKTransmitException.java         |   6 +-
 .../exceptions/ChecksumFailedException.java     |   3 +
 .../exceptions/DLClientClosedException.java     |   3 +
 .../distributedlog/exceptions/DLException.java  |   4 +-
 .../exceptions/DLIllegalStateException.java     |   3 +
 .../exceptions/EndOfStreamException.java        |   3 +
 .../exceptions/FlushException.java              |   4 +-
 .../exceptions/IdleReaderException.java         |   5 +
 .../exceptions/InternalServerException.java     |   3 +
 .../InvalidEnvelopedEntryException.java         |   2 +-
 .../exceptions/InvalidStreamNameException.java  |   3 +
 .../exceptions/LockCancelledException.java      |   3 +
 .../exceptions/LockingException.java            |   4 +-
 .../exceptions/LogEmptyException.java           |   4 +-
 .../exceptions/LogExistsException.java          |   2 +-
 .../exceptions/LogNotFoundException.java        |   4 +-
 .../exceptions/LogReadException.java            |   1 -
 .../exceptions/LogRecordTooLongException.java   |   5 +
 .../exceptions/MetadataException.java           |   3 +
 .../exceptions/NotYetImplementedException.java  |   3 +
 .../exceptions/OverCapacityException.java       |   6 +
 .../OwnershipAcquireFailedException.java        |   5 +
 .../exceptions/ReadCancelledException.java      |   3 +
 .../exceptions/RegionUnavailableException.java  |   5 +
 .../exceptions/RequestDeniedException.java      |   3 +
 .../exceptions/RetryableReadException.java      |   3 +
 .../exceptions/ServiceUnavailableException.java |   3 +
 .../exceptions/StreamNotReadyException.java     |   3 +
 .../exceptions/StreamUnavailableException.java  |   3 +
 .../exceptions/TooManyStreamsException.java     |   3 +
 .../TransactionIdOutOfOrderException.java       |   3 +
 .../exceptions/UnexpectedException.java         |   3 +
 .../UnsupportedMetadataVersionException.java    |   3 +
 .../exceptions/WriteCancelledException.java     |  11 +-
 .../exceptions/WriteException.java              |   6 +-
 .../distributedlog/exceptions/package-info.java |  21 +
 .../distributedlog/io/CompressionCodec.java     |   5 +-
 .../distributedlog/io/CompressionUtils.java     |  15 +-
 .../io/IdentityCompressionCodec.java            |  15 +-
 .../distributedlog/io/LZ4CompressionCodec.java  |  37 +-
 .../twitter/distributedlog/io/package-info.java |  21 +
 .../twitter/distributedlog/package-info.java    |  21 +
 .../distributedlog/util/BitMaskUtils.java       |  13 +-
 .../distributedlog/util/ProtocolUtils.java      |   5 +-
 .../distributedlog/util/package-info.java       |  21 +
 .../com/twitter/distributedlog/TestDLSN.java    |   7 +-
 .../distributedlog/TestLogRecordSet.java        |  17 +-
 pom.xml                                         |   1 +
 65 files changed, 1267 insertions(+), 488 deletions(-)
----------------------------------------------------------------------



[21/29] incubator-distributedlog git commit: Merge branch 'merge/DL-92' into merge/DL-93

Posted by si...@apache.org.
Merge branch 'merge/DL-92' into merge/DL-93


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/9613f40d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/9613f40d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/9613f40d

Branch: refs/heads/merge/DL-98
Commit: 9613f40dec6eeecdb3461d8b997520db76c425b4
Parents: 72a786e adc8ad1
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:52:19 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:52:19 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[19/29] incubator-distributedlog git commit: Merge branch 'merge/DL-90' into merge/DL-91

Posted by si...@apache.org.
Merge branch 'merge/DL-90' into merge/DL-91


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/4fc6f3d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/4fc6f3d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/4fc6f3d2

Branch: refs/heads/merge/DL-98
Commit: 4fc6f3d271dc440f324fc537e024fe43d2c285e1
Parents: 0a18f56 76be38e
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:51:17 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:51:17 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[02/29] incubator-distributedlog git commit: use zero padded partition strings in stats

Posted by si...@apache.org.
use zero padded partition strings in stats

RB_ID=833803


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/00919605
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/00919605
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/00919605

Branch: refs/heads/merge/DL-98
Commit: 00919605f1bfa3f3e0c8514dc63552df4eeb3cc2
Parents: 904b898
Author: Jordan Bull <jb...@twitter.com>
Authored: Wed May 18 10:18:34 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:17:02 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/service/stream/StreamOpStats.java | 5 +++--
 .../twitter/distributedlog/service/streamset/Partition.java  | 8 ++++++++
 2 files changed, 11 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00919605/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index 2a44d88..bfbc88c 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -85,8 +85,9 @@ public class StreamOpStats {
 
     public StatsLogger streamRequestStatsLogger(Partition partition) {
         return BroadCastStatsLogger.masterslave(
-            streamStatsLogger.scope(partition.getStream()).scope("partition").scope(Integer.toString(partition.getId())),
-            streamStatsLogger.scope(partition.getStream()).scope("aggregate"));
+            streamStatsLogger.scope(partition.getStream()).scope("partition")
+                .scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
+                .scope("aggregate"));
     }
 
     public StatsLogger streamRequestScope(Partition partition, String scopeName) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/00919605/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
index f207eee..d199f88 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
@@ -57,6 +57,14 @@ public class Partition {
         return id;
     }
 
+    /**
+     * Get the 6 digit 0 padded id of this partition as a String.
+     * @return partition id
+     */
+    public String getPaddedId() {
+        return String.format("%06d", getId());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {


[25/29] incubator-distributedlog git commit: Merge branch 'merge/DL-96' into merge/DL-97

Posted by si...@apache.org.
Merge branch 'merge/DL-96' into merge/DL-97


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b8e4c146
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b8e4c146
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b8e4c146

Branch: refs/heads/merge/DL-98
Commit: b8e4c1462a107345f87eb48f82b0ee76f9fc4e4d
Parents: bc70b77 a1a3516
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:54:12 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:54:12 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b8e4c146/pom.xml
----------------------------------------------------------------------


[12/29] incubator-distributedlog git commit: LogSegmentMetadataStore should only notify when the list of log segments is updated

Posted by si...@apache.org.
LogSegmentMetadataStore should only notify when the list of log segments is updated

Currently it notifies the listeners not only when there is a change but also when session expires. it would break the readahead loop and cause readers have to wait until it is able to connect to zookeeper again.

With this change, it would only notify when the list of log segments is updated. If it disconnects to zookeeper, the listener won't be notified and it would keep reading from the log segments it knows.

RB_ID=842998


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/72a786e7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/72a786e7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/72a786e7

Branch: refs/heads/merge/DL-98
Commit: 72a786e78540fb5a3f8e27f9d71d3616d06d1548
Parents: f008f75
Author: Sijie Guo <si...@twitter.com>
Authored: Mon Jul 11 10:10:55 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:52:18 2016 -0800

----------------------------------------------------------------------
 .../impl/ZKLogSegmentMetadataStore.java         | 94 ++++++++++++++++----
 .../impl/TestZKLogSegmentMetadataStore.java     | 22 ++---
 2 files changed, 84 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
index c0796a1..cb53b23 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/impl/ZKLogSegmentMetadataStore.java
@@ -17,6 +17,7 @@
  */
 package com.twitter.distributedlog.impl;
 
+import com.google.common.collect.ImmutableList;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.LogSegmentMetadata;
 import com.twitter.distributedlog.ZooKeeperClient;
@@ -45,10 +46,14 @@ import org.apache.zookeeper.Watcher;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.runtime.AbstractFunction1;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -64,7 +69,9 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     private static final Logger logger = LoggerFactory.getLogger(ZKLogSegmentMetadataStore.class);
 
-    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<List<String>> {
+    private static final List<String> EMPTY_LIST = ImmutableList.of();
+
+    private static class ReadLogSegmentsTask implements Runnable, FutureEventListener<Versioned<List<String>>> {
 
         private final String logSegmentsPath;
         private final ZKLogSegmentMetadataStore store;
@@ -78,15 +85,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
 
         @Override
-        public void onSuccess(final List<String> segments) {
+        public void onSuccess(final Versioned<List<String>> segments) {
             // reset the back off after a successful operation
             currentZKBackOffMs = store.minZKBackoffMs;
-            final Set<LogSegmentNamesListener> listenerSet = store.listeners.get(logSegmentsPath);
+            final Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    store.listeners.get(logSegmentsPath);
             if (null != listenerSet) {
                 store.submitTask(logSegmentsPath, new Runnable() {
                     @Override
                     public void run() {
-                        for (LogSegmentNamesListener listener : listenerSet) {
+                        for (VersionedLogSegmentNamesListener listener : listenerSet.values()) {
                             listener.onSegmentsUpdated(segments);
                         }
                     }
@@ -120,6 +128,48 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
         }
     }
 
+    /**
+     * A log segment names listener that keeps tracking the version of list of log segments that it has been notified.
+     * It only notify the newer log segments.
+     */
+    static class VersionedLogSegmentNamesListener {
+
+        private final LogSegmentNamesListener listener;
+        private Versioned<List<String>> lastNotifiedLogSegments;
+
+        VersionedLogSegmentNamesListener(LogSegmentNamesListener listener) {
+            this.listener = listener;
+            this.lastNotifiedLogSegments = new Versioned<List<String>>(EMPTY_LIST, Version.NEW);
+        }
+
+        synchronized void onSegmentsUpdated(Versioned<List<String>> logSegments) {
+            if (lastNotifiedLogSegments.getVersion() == Version.NEW ||
+                    lastNotifiedLogSegments.getVersion().compare(logSegments.getVersion()) == Version.Occurred.BEFORE) {
+                lastNotifiedLogSegments = logSegments;
+                listener.onSegmentsUpdated(logSegments.getValue());
+            }
+        }
+
+        @Override
+        public int hashCode() {
+            return listener.hashCode();
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof VersionedLogSegmentNamesListener)) {
+                return false;
+            }
+            VersionedLogSegmentNamesListener other = (VersionedLogSegmentNamesListener) obj;
+            return listener.equals(other.listener);
+        }
+
+        @Override
+        public String toString() {
+            return listener.toString();
+        }
+    }
+
     final DistributedLogConfiguration conf;
     // settings
     final int minZKBackoffMs;
@@ -128,7 +178,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     final ZooKeeperClient zkc;
     // log segment listeners
-    final ConcurrentMap<String, Set<LogSegmentNamesListener>> listeners;
+    final ConcurrentMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>> listeners;
     // scheduler
     final OrderedScheduler scheduler;
     final ReentrantReadWriteLock closeLock;
@@ -139,7 +189,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                                      OrderedScheduler scheduler) {
         this.conf = conf;
         this.zkc = zkc;
-        this.listeners = new ConcurrentHashMap<String, Set<LogSegmentNamesListener>>();
+        this.listeners =
+                new ConcurrentHashMap<String, Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener>>();
         this.scheduler = scheduler;
         this.closeLock = new ReentrantReadWriteLock();
         // settings
@@ -275,11 +326,16 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
 
     @Override
     public Future<List<String>> getLogSegmentNames(String logSegmentsPath) {
-        return getLogSegmentNames(logSegmentsPath, null);
+        return getLogSegmentNames(logSegmentsPath, null).map(new AbstractFunction1<Versioned<List<String>>, List<String>>() {
+            @Override
+            public List<String> apply(Versioned<List<String>> list) {
+                return list.getValue();
+            }
+        });
     }
 
-    Future<List<String>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
-        Promise<List<String>> result = new Promise<List<String>>();
+    Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath, Watcher watcher) {
+        Promise<Versioned<List<String>>> result = new Promise<Versioned<List<String>>>();
         try {
             zkc.get().getChildren(logSegmentsPath, watcher, this, result);
         } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
@@ -293,9 +349,11 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
     @Override
     @SuppressWarnings("unchecked")
     public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) {
-        Promise<List<String>> result = ((Promise<List<String>>) ctx);
+        Promise<Versioned<List<String>>> result = ((Promise<Versioned<List<String>>>) ctx);
         if (KeeperException.Code.OK.intValue() == rc) {
-            result.setValue(children);
+            /** cversion: the number of changes to the children of this znode **/
+            ZkVersion zkVersion = new ZkVersion(stat.getCversion());
+            result.setValue(new Versioned(children, zkVersion));
         } else {
             result.setException(KeeperException.create(KeeperException.Code.get(rc)));
         }
@@ -312,10 +370,13 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
-                Set<LogSegmentNamesListener> newListenerSet = new HashSet<LogSegmentNamesListener>();
-                Set<LogSegmentNamesListener> oldListenerSet = listeners.putIfAbsent(logSegmentsPath, newListenerSet);
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> newListenerSet =
+                        new HashMap<LogSegmentNamesListener, VersionedLogSegmentNamesListener>();
+                Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> oldListenerSet =
+                        listeners.putIfAbsent(logSegmentsPath, newListenerSet);
                 if (null != oldListenerSet) {
                     listenerSet = oldListenerSet;
                 } else {
@@ -323,7 +384,7 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
                 }
             }
             synchronized (listenerSet) {
-                listenerSet.add(listener);
+                listenerSet.put(listener, new VersionedLogSegmentNamesListener(listener));
                 if (!listeners.containsKey(logSegmentsPath)) {
                     // listener set has been removed, add it back
                     listeners.put(logSegmentsPath, listenerSet);
@@ -343,7 +404,8 @@ public class ZKLogSegmentMetadataStore implements LogSegmentMetadataStore, Watch
             if (closed) {
                 return;
             }
-            Set<LogSegmentNamesListener> listenerSet = listeners.get(logSegmentsPath);
+            Map<LogSegmentNamesListener, VersionedLogSegmentNamesListener> listenerSet =
+                    listeners.get(logSegmentsPath);
             if (null == listenerSet) {
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/72a786e7/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
index e4c774b..f8fd3eb 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/impl/TestZKLogSegmentMetadataStore.java
@@ -367,7 +367,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -429,7 +429,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -496,7 +496,7 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         lsmStore.registerLogSegmentListener(rootPath, listener);
         assertEquals(1, lsmStore.listeners.size());
         assertTrue("Should contain listener", lsmStore.listeners.containsKey(rootPath));
-        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).contains(listener));
+        assertTrue("Should contain listener", lsmStore.listeners.get(rootPath).containsKey(listener));
         while (numNotifications.get() < 1) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
@@ -510,16 +510,6 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         ZooKeeperClientUtils.expireSession(zkc,
                 DLUtils.getZKServersFromDLUri(uri), conf.getZKSessionTimeoutMilliseconds());
 
-        while (numNotifications.get() < 2) {
-            TimeUnit.MILLISECONDS.sleep(10);
-        }
-        assertEquals("Should receive second segment list update",
-                2, numNotifications.get());
-        List<String> secondSegmentList = segmentLists.get(1);
-        Collections.sort(secondSegmentList);
-        assertEquals("List of segments should be same",
-                children, secondSegmentList);
-
         logger.info("Create another {} segments.", numSegments);
 
         // create another log segment, it should trigger segment list updated
@@ -532,12 +522,12 @@ public class TestZKLogSegmentMetadataStore extends TestDistributedLogBase {
         List<String> newChildren = zkc.get().getChildren(rootPath, false);
         Collections.sort(newChildren);
         logger.info("All log segments become {}", newChildren);
-        while (numNotifications.get() < 3) {
+        while (numNotifications.get() < 2) {
             TimeUnit.MILLISECONDS.sleep(10);
         }
         assertEquals("Should receive third segment list update",
-                3, numNotifications.get());
-        List<String> thirdSegmentList = segmentLists.get(2);
+                2, numNotifications.get());
+        List<String> thirdSegmentList = segmentLists.get(1);
         Collections.sort(thirdSegmentList);
         assertEquals("List of segments should be updated",
                 2 * numSegments, thirdSegmentList.size());


[15/29] incubator-distributedlog git commit: Remove unused methods in BKLogHandler

Posted by si...@apache.org.
Remove unused methods in BKLogHandler


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b7ae590e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b7ae590e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b7ae590e

Branch: refs/heads/merge/DL-98
Commit: b7ae590e8b819ae63c0139611320dcc0d27e39f7
Parents: f4f633f
Author: Leigh Stewart <ls...@twitter.com>
Authored: Mon Dec 12 17:01:43 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 17:01:43 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/BKLogHandler.java    | 107 -------------------
 1 file changed, 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b7ae590e/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
index 3b991e2..460de11 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKLogHandler.java
@@ -32,9 +32,7 @@ import com.twitter.distributedlog.io.AsyncCloseable;
 import com.twitter.distributedlog.logsegment.LogSegmentCache;
 import com.twitter.distributedlog.logsegment.LogSegmentFilter;
 import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
-import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.OrderedScheduler;
-import com.twitter.distributedlog.util.Utils;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
@@ -103,8 +101,6 @@ import java.util.concurrent.atomic.AtomicReference;
 public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbortable {
     static final Logger LOG = LoggerFactory.getLogger(BKLogHandler.class);
 
-    private static final int LAYOUT_VERSION = -1;
-
     protected final ZKLogMetadata logMetadata;
     protected final DistributedLogConfiguration conf;
     protected final ZooKeeperClient zooKeeperClient;
@@ -458,57 +454,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         }
     }
 
-    public LogRecordWithDLSN getLastLogRecord(boolean recover, boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        List<LogSegmentMetadata> ledgerList = getFullLedgerListDesc(true, true);
-
-        for (LogSegmentMetadata metadata: ledgerList) {
-            LogRecordWithDLSN record = recoverLastRecordInLedger(metadata, recover, false, includeEndOfStream);
-
-            if (null != record) {
-                assert(!record.isControl());
-                LOG.debug("{} getLastLogRecord Returned {}", getFullyQualifiedName(), record);
-                return record;
-            }
-        }
-
-        throw new LogEmptyException("Log " + getFullyQualifiedName() + " has no records");
-    }
-
-    public long getLastTxId(boolean recover,
-                            boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        return getLastLogRecord(recover, includeEndOfStream).getTransactionId();
-    }
-
-    public DLSN getLastDLSN(boolean recover,
-                            boolean includeEndOfStream) throws IOException {
-        checkLogStreamExists();
-        return getLastLogRecord(recover, includeEndOfStream).getDlsn();
-    }
-
-    public long getLogRecordCount() throws IOException {
-        try {
-            checkLogStreamExists();
-        } catch (LogNotFoundException exc) {
-            return 0;
-        }
-
-        List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, false);
-        long count = 0;
-        for (LogSegmentMetadata l : ledgerList) {
-            if (l.isInProgress()) {
-                LogRecord record = recoverLastRecordInLedger(l, false, false, false);
-                if (null != record) {
-                    count += record.getLastPositionWithinLogSegment();
-                }
-            } else {
-                count += l.getRecordCount();
-            }
-        }
-        return count;
-    }
-
     private Future<LogRecordWithDLSN> asyncReadFirstUserRecord(LogSegmentMetadata ledger, DLSN beginDLSN) {
         final LedgerHandleCache handleCache =
                 LedgerHandleCache.newBuilder().bkc(bookKeeperClient).conf(conf).build();
@@ -620,15 +565,6 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return sum;
     }
 
-    public long getFirstTxId() throws IOException {
-        checkLogStreamExists();
-        List<LogSegmentMetadata> ledgerList = getFullLedgerList(true, true);
-
-        // The ledger list should at least have one element
-        // First TxId is populated even for in progress ledgers
-        return ledgerList.get(0).getFirstTxId();
-    }
-
     Future<Void> checkLogStreamExistsAsync() {
         final Promise<Void> promise = new Promise<Void>();
         try {
@@ -671,54 +607,11 @@ public abstract class BKLogHandler implements Watcher, AsyncCloseable, AsyncAbor
         return promise;
     }
 
-    private void checkLogStreamExists() throws IOException {
-        try {
-            if (null == Utils.sync(zooKeeperClient, logMetadata.getLogSegmentsPath())
-                    .exists(logMetadata.getLogSegmentsPath(), false)) {
-                throw new LogNotFoundException("Log " + getFullyQualifiedName() + " doesn't exist");
-            }
-        } catch (InterruptedException ie) {
-            LOG.error("Interrupted while reading {}", logMetadata.getLogSegmentsPath(), ie);
-            throw new DLInterruptedException("Interrupted while checking "
-                    + logMetadata.getLogSegmentsPath(), ie);
-        } catch (KeeperException ke) {
-            LOG.error("Error checking existence for {} : ", logMetadata.getLogSegmentsPath(), ke);
-            throw new ZKException("Error checking existence for " + getFullyQualifiedName() + " : ", ke);
-        }
-    }
-
     @Override
     public Future<Void> asyncAbort() {
         return asyncClose();
     }
 
-    /**
-     * Find the id of the last edit log transaction written to a edit log
-     * ledger.
-     */
-    protected Pair<Long, DLSN> readLastTxIdInLedger(LogSegmentMetadata l) throws IOException {
-        LogRecordWithDLSN record = recoverLastRecordInLedger(l, false, false, true);
-
-        if (null == record) {
-            return Pair.of(DistributedLogConstants.EMPTY_LOGSEGMENT_TX_ID, DLSN.InvalidDLSN);
-        }
-        else {
-            return Pair.of(record.getTransactionId(), record.getDlsn());
-        }
-    }
-
-    /**
-     * Find the id of the last edit log transaction written to a edit log
-     * ledger.
-     */
-    protected LogRecordWithDLSN recoverLastRecordInLedger(LogSegmentMetadata l,
-                                                          boolean fence,
-                                                          boolean includeControl,
-                                                          boolean includeEndOfStream)
-        throws IOException {
-        return FutureUtils.result(asyncReadLastRecord(l, fence, includeControl, includeEndOfStream));
-    }
-
     public Future<LogRecordWithDLSN> asyncReadLastUserRecord(final LogSegmentMetadata l) {
         return asyncReadLastRecord(l, false, false, false);
     }


[24/29] incubator-distributedlog git commit: Merge branch 'merge/DL-95' into merge/DL-96

Posted by si...@apache.org.
Merge branch 'merge/DL-95' into merge/DL-96


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/a1a35167
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/a1a35167
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/a1a35167

Branch: refs/heads/merge/DL-98
Commit: a1a351676be9864bbce1e9ba27deadfff79f6b02
Parents: b7ae590 749657f
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:53:50 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:53:50 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[22/29] incubator-distributedlog git commit: Merge branch 'merge/DL-93' into merge/DL-94

Posted by si...@apache.org.
Merge branch 'merge/DL-93' into merge/DL-94


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/2879090c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/2879090c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/2879090c

Branch: refs/heads/merge/DL-98
Commit: 2879090cce120a35e1f5f1a476475f5bf83aba7b
Parents: f18fe17 9613f40
Author: Sijie Guo <si...@apache.org>
Authored: Fri Dec 16 23:52:45 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Fri Dec 16 23:52:45 2016 -0800

----------------------------------------------------------------------
 .gitignore                                      |   4 +
 .travis.yml                                     |   4 +
 Dockerfile                                      |  10 +-
 Vagrantfile                                     |   5 +-
 distributedlog-benchmark/bin/bundle             |   7 +-
 distributedlog-benchmark/bin/dbench             |  15 +-
 distributedlog-benchmark/conf/dlogenv.sh        |   2 +-
 .../routing/ConsistentHashRoutingService.java   |   2 +-
 distributedlog-core/bin/dlog                    |  12 +-
 distributedlog-core/conf/write_proxy.conf       |   2 +-
 .../distributedlog/DistributedLogConstants.java |   2 +-
 distributedlog-service/bin/bundle               |   7 +-
 distributedlog-service/bin/dlog                 |  10 +-
 distributedlog-service/bin/dlog-daemon.sh       | 170 +++++++++----------
 .../distributedlog-kafka/bin/runner             |   1 -
 docker/Dockerfile                               |  35 ----
 docs/admin_guide/vagrant.rst                    |   4 +-
 docs/basics/introduction.rst                    |  14 +-
 docs/deployment/cluster.rst                     |  58 ++++---
 docs/deployment/docker.rst                      |   2 +-
 pom.xml                                         |  11 +-
 scripts/bundle                                  |   2 -
 scripts/common.sh                               |  10 +-
 scripts/integration/smoketest.sh                | 113 ++++++++++++
 scripts/runner                                  |  12 +-
 scripts/snapshot                                |  11 +-
 vagrant/base.sh                                 |  43 ++---
 vagrant/bk.sh                                   |  64 +++----
 vagrant/zk.sh                                   |   5 +-
 29 files changed, 362 insertions(+), 275 deletions(-)
----------------------------------------------------------------------



[05/29] incubator-distributedlog git commit: DL: remove watches when unregister children watches

Posted by si...@apache.org.
DL: remove watches when unregister children watches

RB_ID=833858


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/b571d3b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/b571d3b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/b571d3b4

Branch: refs/heads/merge/DL-98
Commit: b571d3b4adcc140acca881979782474c27459d34
Parents: 517c77c
Author: Sijie Guo <si...@twitter.com>
Authored: Mon May 23 21:01:57 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:35:26 2016 -0800

----------------------------------------------------------------------
 .../twitter/distributedlog/ZooKeeperClient.java |  1 +
 .../distributedlog/zk/ZKWatcherManager.java     | 34 +++++++++++++++++++-
 .../distributedlog/zk/TestZKWatcherManager.java |  1 +
 3 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
index 912d592..9ea9e37 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/ZooKeeperClient.java
@@ -169,6 +169,7 @@ public class ZooKeeperClient {
         this.credentials = credentials;
         this.watcherManager = ZKWatcherManager.newBuilder()
                 .name(name)
+                .zkc(this)
                 .statsLogger(statsLogger.scope("watcher_manager"))
                 .build();
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
index 4068737..a24b560 100644
--- a/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
+++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/zk/ZKWatcherManager.java
@@ -17,8 +17,11 @@
  */
 package com.twitter.distributedlog.zk;
 
+import com.twitter.distributedlog.ZooKeeperClient;
 import org.apache.bookkeeper.stats.Gauge;
 import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.zookeeper.AsyncCallback;
+import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.WatchedEvent;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
@@ -50,31 +53,40 @@ public class ZKWatcherManager implements Watcher {
 
         private String _name;
         private StatsLogger _statsLogger;
+        private ZooKeeperClient _zkc;
 
         public Builder name(String name) {
             this._name = name;
             return this;
         }
 
+        public Builder zkc(ZooKeeperClient zkc) {
+            this._zkc = zkc;
+            return this;
+        }
+
         public Builder statsLogger(StatsLogger statsLogger) {
             this._statsLogger = statsLogger;
             return this;
         }
 
         public ZKWatcherManager build() {
-            return new ZKWatcherManager(_name, _statsLogger);
+            return new ZKWatcherManager(_name, _zkc, _statsLogger);
         }
     }
 
     private final String name;
+    private final ZooKeeperClient zkc;
     private final StatsLogger statsLogger;
 
     protected final ConcurrentMap<String, Set<Watcher>> childWatches;
     protected final AtomicInteger allWatchesGauge;
 
     private ZKWatcherManager(String name,
+                             ZooKeeperClient zkc,
                              StatsLogger statsLogger) {
         this.name = name;
+        this.zkc = zkc;
         this.statsLogger = statsLogger;
 
         // watches
@@ -141,6 +153,26 @@ public class ZKWatcherManager implements Watcher {
                 logger.warn("Remove a non-registered child watcher {} from path {}", watcher, path);
             }
             if (watchers.isEmpty()) {
+                // best-efforts to remove watches
+                try {
+                    if (null != zkc) {
+                        zkc.get().removeWatches(path, this, WatcherType.Children, true, new AsyncCallback.VoidCallback() {
+                            @Override
+                            public void processResult(int rc, String path, Object ctx) {
+                                if (KeeperException.Code.OK.intValue() == rc) {
+                                    logger.debug("Successfully removed children watches from {}", path);
+                                } else {
+                                    logger.debug("Encountered exception on removing children watches from {}",
+                                            path, KeeperException.create(KeeperException.Code.get(rc)));
+                                }
+                            }
+                        }, null);
+                    }
+                } catch (InterruptedException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path, e);
+                } catch (ZooKeeperClient.ZooKeeperConnectionException e) {
+                    logger.debug("Encountered exception on removing watches from {}", path, e);
+                }
                 childWatches.remove(path, watchers);
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b571d3b4/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
index ee00ab9..6f269c3 100644
--- a/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
+++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/zk/TestZKWatcherManager.java
@@ -34,6 +34,7 @@ public class TestZKWatcherManager {
     public void testRegisterUnregisterWatcher() throws Exception {
         ZKWatcherManager watcherManager = ZKWatcherManager.newBuilder()
                 .name("test-register-unregister-watcher")
+                .zkc(null)
                 .statsLogger(NullStatsLogger.INSTANCE)
                 .build();
         String path = "/test-register-unregister-watcher";


[11/29] incubator-distributedlog git commit: Allow configuring flush interval in micros for distributedlog multi streams writer

Posted by si...@apache.org.
Allow configuring flush interval in micros for distributedlog multi streams writer

RB_ID=846974


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f008f751
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f008f751
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f008f751

Branch: refs/heads/merge/DL-98
Commit: f008f751ec0a020a670903b01c709007ada87381
Parents: f19e756
Author: Sijie Guo <si...@twitter.com>
Authored: Fri Jul 8 15:22:55 2016 -0700
Committer: Sijie Guo <si...@twitter.com>
Committed: Mon Dec 12 16:52:03 2016 -0800

----------------------------------------------------------------------

----------------------------------------------------------------------



[26/29] incubator-distributedlog git commit: Merge remote-tracking branch 'apache/master' into merge/DL-97

Posted by si...@apache.org.
Merge remote-tracking branch 'apache/master' into merge/DL-97


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/f95a0a83
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/f95a0a83
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/f95a0a83

Branch: refs/heads/merge/DL-98
Commit: f95a0a830b85e65c4a792223f57e9ccc1d74ea6d
Parents: b8e4c14 26942a9
Author: Sijie Guo <si...@apache.org>
Authored: Tue Dec 20 00:08:58 2016 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Tue Dec 20 00:08:58 2016 -0800

----------------------------------------------------------------------
 .../benchmark/stream/AsyncReaderBenchmark.java  |   6 +
 .../benchmark/stream/LedgerReadBenchmark.java   |   4 +
 .../benchmark/stream/SyncReaderBenchmark.java   |   6 +
 .../client/routing/TestRoutingService.java      |   4 +-
 .../metadata/ZkMetadataResolver.java            |   1 +
 .../com/twitter/distributedlog/DLMTestUtil.java |   4 +-
 .../distributedlog/TestAsyncReaderWriter.java   |   8 +-
 .../TestBKDistributedLogManager.java            |  10 +-
 .../TestBKDistributedLogNamespace.java          |  25 +++--
 .../distributedlog/TestDistributedLogBase.java  |  10 +-
 .../distributedlog/TestInterleavedReaders.java  |   2 +-
 .../twitter/distributedlog/TestTruncate.java    |   4 +-
 .../distributedlog/TestWriteLimiter.java        |  20 ++--
 .../bk/TestLedgerAllocatorPool.java             |   2 +-
 .../config/TestConcurrentBaseConfiguration.java |   2 +-
 .../impl/TestZKLogSegmentFilters.java           |   2 +-
 .../limiter/TestRequestLimiter.java             |   2 +-
 .../TestLogSegmentMetadataStoreUpdater.java     |   2 +-
 .../distributedlog/util/TestConfUtils.java      |   2 +-
 .../distributedlog/util/TestPermitManager.java  |   4 +-
 .../util/TestSafeQueueingFuturePool.java        |  10 +-
 .../src/main/thrift/service.thrift              | 111 ++++++++++---------
 .../service/stream/TestStreamManager.java       |   2 +-
 .../2016-09-19-kafka-vs-distributedlog.md       |   3 +-
 24 files changed, 136 insertions(+), 110 deletions(-)
----------------------------------------------------------------------



[28/29] incubator-distributedlog git commit: Merge branch 'master' into merge/DL-97

Posted by si...@apache.org.
Merge branch 'master' into merge/DL-97


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/83a4e092
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/83a4e092
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/83a4e092

Branch: refs/heads/merge/DL-98
Commit: 83a4e092dc9e1ae4e75f085f8ceb932b789df4c2
Parents: f95a0a8 3bd1620
Author: Sijie Guo <si...@twitter.com>
Authored: Tue Dec 20 17:02:40 2016 -0800
Committer: Sijie Guo <si...@twitter.com>
Committed: Tue Dec 20 17:02:40 2016 -0800

----------------------------------------------------------------------
 dev-support/DL_formatter.xml                    | 310 -------------
 distributedlog-build-tools/pom.xml              |  30 ++
 .../resources/distributedlog/checkstyle.xml     | 443 +++++++++++++++++++
 .../resources/distributedlog/suppressions.xml   |  29 ++
 .../main/resources/ide/eclipse/DL_formatter.xml | 310 +++++++++++++
 distributedlog-protocol/pom.xml                 |  33 ++
 .../java/com/twitter/distributedlog/DLSN.java   |  67 +--
 .../EnvelopedRecordSetReader.java               |  14 +-
 .../EnvelopedRecordSetWriter.java               |  25 +-
 .../com/twitter/distributedlog/LogRecord.java   |  75 ++--
 .../twitter/distributedlog/LogRecordSet.java    |  23 +-
 .../distributedlog/LogRecordWithDLSN.java       |  26 +-
 .../twitter/distributedlog/RecordStream.java    |   6 +-
 .../annotations/DistributedLogAnnotations.java  |   3 +
 .../annotations/package-info.java               |  21 +
 .../exceptions/AlreadyClosedException.java      |   6 +-
 .../AlreadyTruncatedTransactionException.java   |   4 +-
 .../exceptions/BKTransmitException.java         |   6 +-
 .../exceptions/ChecksumFailedException.java     |   3 +
 .../exceptions/DLClientClosedException.java     |   3 +
 .../distributedlog/exceptions/DLException.java  |   4 +-
 .../exceptions/DLIllegalStateException.java     |   3 +
 .../exceptions/EndOfStreamException.java        |   3 +
 .../exceptions/FlushException.java              |   4 +-
 .../exceptions/IdleReaderException.java         |   5 +
 .../exceptions/InternalServerException.java     |   3 +
 .../InvalidEnvelopedEntryException.java         |   2 +-
 .../exceptions/InvalidStreamNameException.java  |   3 +
 .../exceptions/LockCancelledException.java      |   3 +
 .../exceptions/LockingException.java            |   4 +-
 .../exceptions/LogEmptyException.java           |   4 +-
 .../exceptions/LogExistsException.java          |   2 +-
 .../exceptions/LogNotFoundException.java        |   4 +-
 .../exceptions/LogReadException.java            |   1 -
 .../exceptions/LogRecordTooLongException.java   |   5 +
 .../exceptions/MetadataException.java           |   3 +
 .../exceptions/NotYetImplementedException.java  |   3 +
 .../exceptions/OverCapacityException.java       |   6 +
 .../OwnershipAcquireFailedException.java        |   5 +
 .../exceptions/ReadCancelledException.java      |   3 +
 .../exceptions/RegionUnavailableException.java  |   5 +
 .../exceptions/RequestDeniedException.java      |   3 +
 .../exceptions/RetryableReadException.java      |   3 +
 .../exceptions/ServiceUnavailableException.java |   3 +
 .../exceptions/StreamNotReadyException.java     |   3 +
 .../exceptions/StreamUnavailableException.java  |   3 +
 .../exceptions/TooManyStreamsException.java     |   3 +
 .../TransactionIdOutOfOrderException.java       |   3 +
 .../exceptions/UnexpectedException.java         |   3 +
 .../UnsupportedMetadataVersionException.java    |   3 +
 .../exceptions/WriteCancelledException.java     |  11 +-
 .../exceptions/WriteException.java              |   6 +-
 .../distributedlog/exceptions/package-info.java |  21 +
 .../distributedlog/io/CompressionCodec.java     |   5 +-
 .../distributedlog/io/CompressionUtils.java     |  15 +-
 .../io/IdentityCompressionCodec.java            |  15 +-
 .../distributedlog/io/LZ4CompressionCodec.java  |  37 +-
 .../twitter/distributedlog/io/package-info.java |  21 +
 .../twitter/distributedlog/package-info.java    |  21 +
 .../distributedlog/util/BitMaskUtils.java       |  13 +-
 .../distributedlog/util/ProtocolUtils.java      |   5 +-
 .../distributedlog/util/package-info.java       |  21 +
 .../com/twitter/distributedlog/TestDLSN.java    |   7 +-
 .../distributedlog/TestLogRecordSet.java        |  17 +-
 pom.xml                                         |   1 +
 65 files changed, 1267 insertions(+), 488 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/83a4e092/pom.xml
----------------------------------------------------------------------