You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/04 08:44:00 UTC

[1/4] incubator-distributedlog git commit: DL-132: Enable check style for distributedlog service module.

Repository: incubator-distributedlog
Updated Branches:
  refs/heads/master 32a52a9f7 -> 1a30b0ceb


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
index 4036298..d9c2ad1 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
@@ -17,18 +17,24 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
-import com.google.common.collect.Sets;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
+import com.google.common.collect.Sets;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link CountBasedStreamChooser}.
+ */
 public class TestCountBasedStreamChooser {
 
     @Test(timeout = 60000)
@@ -52,7 +58,7 @@ public class TestCountBasedStreamChooser {
             }
 
             CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
-            for (int k = 0; k < i+1; k++) {
+            for (int k = 0; k < i + 1; k++) {
                 assertNull(chooser.choose());
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
index 59d1d10..04656bc 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
@@ -17,27 +17,29 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.distributedlog.service.DistributedLogClient;
 import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
 import com.twitter.distributedlog.service.DistributedLogServerTestCase;
 import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.Set;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.*;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link SimpleBalancer}.
+ */
 public class TestSimpleBalancer extends DistributedLogServerTestCase {
 
-    static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
 
     DLClient targetClient;
     DLServer targetServer;
@@ -82,7 +84,7 @@ public class TestSimpleBalancer extends DistributedLogServerTestCase {
         // write to multiple streams
         for (int i = 0; i < numStreams; i++) {
             String name = namePrefix + i;
-            Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
         }
 
         // validation
@@ -139,7 +141,7 @@ public class TestSimpleBalancer extends DistributedLogServerTestCase {
         // write to multiple streams
         for (int i = 0; i < numStreams; i++) {
             String name = namePrefix + i;
-            Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
+            Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
         }
 
         // validation

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
index 3f90f35..d666cf7 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
@@ -17,20 +17,22 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Optional;
 import com.twitter.distributedlog.service.DistributedLogClient;
 import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
 import com.twitter.distributedlog.service.DistributedLogServerTestCase;
 import com.twitter.util.Await;
+import java.nio.ByteBuffer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StreamMover}.
+ */
 public class TestStreamMover extends DistributedLogServerTestCase {
 
     DLClient targetClient;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
index c76dcb6..85ceb95 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
@@ -17,9 +17,15 @@
  */
 package com.twitter.distributedlog.service.config;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import org.junit.Test;
-import static org.junit.Assert.*;
 
+/**
+ * Test Case for {@link ServerConfiguration}.
+ */
 public class TestServerConfiguration {
 
     @Test(timeout = 60000, expected = IllegalArgumentException.class)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
index e934879..462f4f3 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
@@ -17,21 +17,25 @@
  */
 package com.twitter.distributedlog.service.config;
 
+import static com.twitter.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.config.PropertiesWriter;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import org.junit.Test;
-
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.junit.Test;
 
-import static com.twitter.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StreamConfigProvider}.
+ */
 public class TestStreamConfigProvider {
     private static final String DEFAULT_CONFIG_DIR = "conf";
     private final String defaultConfigPath;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
index bde33c6..a12a64e 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,20 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -25,139 +39,138 @@ import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.runtime.BoxedUnit;
-
 import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
+/**
+ * Test Case for {@link LeastLoadPlacementPolicy}.
+ */
 public class TestLeastLoadPlacementPolicy {
 
-  @Test(timeout = 10000)
-  public void testCalculateBalances() throws Exception {
-    int numSevers = new Random().nextInt(20) + 1;
-    int numStreams = new Random().nextInt(200) + 1;
-    RoutingService mockRoutingService = mock(RoutingService.class);
-    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
-    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
-    long lowLoadPerServer = numStreams / numSevers;
-    long highLoadPerServer = lowLoadPerServer + 1;
-    for (ServerLoad serverLoad: serverLoads) {
-      long load = serverLoad.getLoad();
-      assertEquals(load, serverLoad.getStreamLoads().size());
-      assertTrue(String.format("Load %d is not between %d and %d", load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+    @Test(timeout = 10000)
+    public void testCalculateBalances() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+            new EqualLoadAppraiser(),
+            mockRoutingService,
+            mockNamespace,
+            null,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        TreeSet<ServerLoad> serverLoads =
+            Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+        long lowLoadPerServer = numStreams / numSevers;
+        long highLoadPerServer = lowLoadPerServer + 1;
+        for (ServerLoad serverLoad : serverLoads) {
+            long load = serverLoad.getLoad();
+            assertEquals(load, serverLoad.getStreamLoads().size());
+            assertTrue(String.format("Load %d is not between %d and %d",
+                load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+        }
     }
-  }
 
-  @Test(timeout = 10000)
-  public void testRefreshAndPlaceStream() throws Exception {
-    int numSevers = new Random().nextInt(20) + 1;
-    int numStreams = new Random().nextInt(200) + 1;
-    RoutingService mockRoutingService = mock(RoutingService.class);
-    when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
-    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-    try {
-      when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
-    } catch (IOException e) {
-      fail();
-    }
-    PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
-    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-        new EqualLoadAppraiser(), mockRoutingService, mockNamespace, mockPlacementStateManager, Duration.fromSeconds(600), new NullStatsLogger());
-    leastLoadPlacementPolicy.refresh();
+    @Test(timeout = 10000)
+    public void testRefreshAndPlaceStream() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
+        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        try {
+            when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
+        } catch (IOException e) {
+            fail();
+        }
+        PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+            new EqualLoadAppraiser(),
+            mockRoutingService,
+            mockNamespace,
+            mockPlacementStateManager,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        leastLoadPlacementPolicy.refresh();
 
-    final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
-    verify(mockPlacementStateManager).saveOwnership(captor.capture());
-    TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>)captor.getValue();
-    ServerLoad next = serverLoads.first();
-    String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
-    assertEquals(next.getServer(), serverPlacement);
-  }
+        final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
+        verify(mockPlacementStateManager).saveOwnership(captor.capture());
+        TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue();
+        ServerLoad next = serverLoads.first();
+        String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
+        assertEquals(next.getServer(), serverPlacement);
+    }
 
-  @Test(timeout = 10000)
-  public void testCalculateUnequalWeight() throws Exception {
-    int numSevers = new Random().nextInt(20) + 1;
-    int numStreams = new Random().nextInt(200) + 1;
+    @Test(timeout = 10000)
+    public void testCalculateUnequalWeight() throws Exception {
+        int numSevers = new Random().nextInt(20) + 1;
+        int numStreams = new Random().nextInt(200) + 1;
     /* use AtomicInteger to have a final object in answer method */
-    final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
-    RoutingService mockRoutingService = mock(RoutingService.class);
-    DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
-    LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
-    when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
-      @Override
-      public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
-        int load = new Random().nextInt(100000);
-        if (load > maxLoad.get()) {
-          maxLoad.set(load);
+        final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
+        RoutingService mockRoutingService = mock(RoutingService.class);
+        DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+        LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
+        when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
+            @Override
+            public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
+                int load = new Random().nextInt(100000);
+                if (load > maxLoad.get()) {
+                    maxLoad.set(load);
+                }
+                return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
+            }
+        });
+        LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+            mockLoadAppraiser,
+            mockRoutingService,
+            mockNamespace,
+            null,
+            Duration.fromSeconds(600),
+            new NullStatsLogger());
+        TreeSet<ServerLoad> serverLoads =
+            Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+        long highestLoadSeen = Long.MIN_VALUE;
+        long lowestLoadSeen = Long.MAX_VALUE;
+        for (ServerLoad serverLoad : serverLoads) {
+            long load = serverLoad.getLoad();
+            if (load < lowestLoadSeen) {
+                lowestLoadSeen = load;
+            }
+            if (load > highestLoadSeen) {
+                highestLoadSeen = load;
+            }
         }
-        return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
-      }
-    });
-    LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
-        mockLoadAppraiser, mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
-    TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
-    long highestLoadSeen = Long.MIN_VALUE;
-    long lowestLoadSeen = Long.MAX_VALUE;
-    for (ServerLoad serverLoad: serverLoads) {
-      long load = serverLoad.getLoad();
-      if (load < lowestLoadSeen) {
-        lowestLoadSeen = load;
-      }
-      if (load > highestLoadSeen) {
-        highestLoadSeen = load;
-      }
-    }
-    assertTrue("Unexpected placement for " + numStreams + " streams to "
-            + numSevers + " servers : highest load = " + highestLoadSeen
-            + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
+        assertTrue("Unexpected placement for " + numStreams + " streams to "
+                + numSevers + " servers : highest load = " + highestLoadSeen
+                + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
             highestLoadSeen - lowestLoadSeen < maxLoad.get());
-  }
+    }
 
-  private Set<SocketAddress> generateSocketAddresses(int num) {
-    LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
-    for (int i = 0; i < num; i++) {
-      socketAddresses.add(new InetSocketAddress(i));
+    private Set<SocketAddress> generateSocketAddresses(int num) {
+        LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
+        for (int i = 0; i < num; i++) {
+            socketAddresses.add(new InetSocketAddress(i));
+        }
+        return socketAddresses;
     }
-    return socketAddresses;
-  }
 
-  private Set<String> generateStreams(int num) {
-    LinkedHashSet<String> streams = new LinkedHashSet<String>();
-    for (int i = 0; i < num; i++) {
-      streams.add("stream_" + i);
+    private Set<String> generateStreams(int num) {
+        LinkedHashSet<String> streams = new LinkedHashSet<String>();
+        for (int i = 0; i < num; i++) {
+            streams.add("stream_" + i);
+        }
+        return streams;
     }
-    return streams;
-  }
 
-  private Set<String> generateServers(int num) {
-    LinkedHashSet<String> servers = new LinkedHashSet<String>();
-    for (int i = 0; i < num; i++) {
-      servers.add("server_" + i);
+    private Set<String> generateServers(int num) {
+        LinkedHashSet<String> servers = new LinkedHashSet<String>();
+        for (int i = 0; i < num; i++) {
+            servers.add("server_" + i);
+        }
+        return servers;
     }
-    return servers;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
index d844f78..42aeddd 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,32 +17,34 @@
  */
 package com.twitter.distributedlog.service.placement;
 
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
+/**
+ * Test Case for {@link ServerLoad}.
+ */
 public class TestServerLoad {
 
-  @Test(timeout = 60000)
-  public void testSerializeDeserialize() throws IOException {
-    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
-    for (int i = 0; i < 20; i++) {
-      serverLoad.addStream(new StreamLoad("stream-"+i, i));
+    @Test(timeout = 60000)
+    public void testSerializeDeserialize() throws IOException {
+        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+        for (int i = 0; i < 20; i++) {
+            serverLoad.addStream(new StreamLoad("stream-" + i, i));
+        }
+        assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
     }
-    assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
-  }
 
-  @Test(timeout = 60000)
-  public void testGetLoad() throws IOException {
-    final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
-    assertEquals(0, serverLoad.getLoad());
-    serverLoad.addStream(new StreamLoad("stream-"+1, 3));
-    assertEquals(3, serverLoad.getLoad());
-    serverLoad.addStream(new StreamLoad("stream-"+2, 7));
-    assertEquals(10, serverLoad.getLoad());
-    serverLoad.addStream(new StreamLoad("stream-"+3, 1));
-    assertEquals(11, serverLoad.getLoad());
-  }
+    @Test(timeout = 60000)
+    public void testGetLoad() throws IOException {
+        final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+        assertEquals(0, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 1, 3));
+        assertEquals(3, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 2, 7));
+        assertEquals(10, serverLoad.getLoad());
+        serverLoad.addStream(new StreamLoad("stream-" + 3, 1));
+        assertEquals(11, serverLoad.getLoad());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
index e5091f5..aac30d4 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,21 @@
  */
 package com.twitter.distributedlog.service.placement;
 
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
 
+import java.io.IOException;
 import org.junit.Test;
 
-import static org.junit.Assert.assertEquals;
-
+/**
+ * Test Case for {@link StreamLoad}.
+ */
 public class TestStreamLoad {
 
-  @Test(timeout = 10000)
-  public void testSerializeDeserialize() throws IOException {
-    final String streamName = "aHellaRandomStreamName";
-    final int load = 1337;
-    final StreamLoad streamLoad = new StreamLoad(streamName, load);
-    assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
-  }
+    @Test(timeout = 10000)
+    public void testSerializeDeserialize() throws IOException {
+        final String streamName = "aHellaRandomStreamName";
+        final int load = 1337;
+        final StreamLoad streamLoad = new StreamLoad(streamName, load);
+        assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
index c02492d..1d11219 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,117 +17,120 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
 import java.io.IOException;
 import java.net.URI;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.curator.test.TestingServer;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.twitter.distributedlog.DistributedLogConfiguration;
-
-import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
+/**
+ * Test Case for {@link ZKPlacementStateManager}.
+ */
 public class TestZKPlacementStateManager {
-  private TestingServer zkTestServer;
-  private String zkServers;
-  private URI uri;
-  private ZKPlacementStateManager zkPlacementStateManager;
-
-  @Before
-  public void startZookeeper() throws Exception {
-    zkTestServer = new TestingServer(2181);
-    zkServers = "127.0.0.1:2181";
-    uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
-    zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
-  }
-
-  @Test(timeout = 60000)
-  public void testSaveLoad() throws Exception {
-    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-    zkPlacementStateManager.saveOwnership(ownerships);
-    SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
-    assertEquals(ownerships, loadedOwnerships);
-
-    ownerships.add(new ServerLoad("emptyServer"));
-    zkPlacementStateManager.saveOwnership(ownerships);
-    loadedOwnerships = zkPlacementStateManager.loadOwnership();
-    assertEquals(ownerships, loadedOwnerships);
-
-    ServerLoad sl1 = new ServerLoad("server1");
-    sl1.addStream(new StreamLoad("stream1", 3));
-    sl1.addStream(new StreamLoad("stream2", 4));
-    ServerLoad sl2 = new ServerLoad("server2");
-    sl2.addStream(new StreamLoad("stream3", 1));
-    ownerships.add(sl1);
-    ownerships.add(sl2);
-    zkPlacementStateManager.saveOwnership(ownerships);
-    loadedOwnerships = zkPlacementStateManager.loadOwnership();
-    assertEquals(ownerships, loadedOwnerships);
+    private TestingServer zkTestServer;
+    private String zkServers;
+    private URI uri;
+    private ZKPlacementStateManager zkPlacementStateManager;
+
+    @Before
+    public void startZookeeper() throws Exception {
+        zkTestServer = new TestingServer(2181);
+        zkServers = "127.0.0.1:2181";
+        uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+        zkPlacementStateManager =
+            new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+    }
 
-    loadedOwnerships.remove(sl1);
-    zkPlacementStateManager.saveOwnership(ownerships);
-    loadedOwnerships = zkPlacementStateManager.loadOwnership();
-    assertEquals(ownerships, loadedOwnerships);
-  }
+    @Test(timeout = 60000)
+    public void testSaveLoad() throws Exception {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        zkPlacementStateManager.saveOwnership(ownerships);
+        SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        ownerships.add(new ServerLoad("emptyServer"));
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        ServerLoad sl1 = new ServerLoad("server1");
+        sl1.addStream(new StreamLoad("stream1", 3));
+        sl1.addStream(new StreamLoad("stream2", 4));
+        ServerLoad sl2 = new ServerLoad("server2");
+        sl2.addStream(new StreamLoad("stream3", 1));
+        ownerships.add(sl1);
+        ownerships.add(sl2);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+
+        loadedOwnerships.remove(sl1);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        loadedOwnerships = zkPlacementStateManager.loadOwnership();
+        assertEquals(ownerships, loadedOwnerships);
+    }
 
-  private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
-          LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
-          int expectedNumServerLoads) throws InterruptedException {
-    TreeSet<ServerLoad> notification = notificationQueue.take();
-    assertNotNull(notification);
-    while (notification.size() < expectedNumServerLoads) {
-      notification = notificationQueue.take();
+    private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
+        LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
+        int expectedNumServerLoads) throws InterruptedException {
+        TreeSet<ServerLoad> notification = notificationQueue.take();
+        assertNotNull(notification);
+        while (notification.size() < expectedNumServerLoads) {
+            notification = notificationQueue.take();
+        }
+        assertEquals(expectedNumServerLoads, notification.size());
+        return notification;
     }
-    assertEquals(expectedNumServerLoads, notification.size());
-    return notification;
-  }
 
-  @Test(timeout = 60000)
-  public void testWatchIndefinitely() throws Exception {
-    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-    ownerships.add(new ServerLoad("server1"));
-    final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
+    @Test(timeout = 60000)
+    public void testWatchIndefinitely() throws Exception {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        ownerships.add(new ServerLoad("server1"));
+        final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
             new LinkedBlockingQueue<TreeSet<ServerLoad>>();
-    PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
-      @Override
-      public void callback(TreeSet<ServerLoad> serverLoads) {
-        serverLoadNotifications.add(serverLoads);
-      }
-    };
-    zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
-    zkPlacementStateManager.watch(callback);
-    // cannot verify the callback here as it may call before the verify is called
-
-    zkPlacementStateManager.saveOwnership(ownerships);
-    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
-
-    ServerLoad server2 = new ServerLoad("server2");
-    server2.addStream(new StreamLoad("hella-important-stream", 415));
-    ownerships.add(server2);
-    zkPlacementStateManager.saveOwnership(ownerships);
-    assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
-  }
+        PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
+            @Override
+            public void callback(TreeSet<ServerLoad> serverLoads) {
+                serverLoadNotifications.add(serverLoads);
+            }
+        };
+        zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
+        zkPlacementStateManager.watch(callback);
+        // cannot verify the callback here as it may call before the verify is called
+
+        zkPlacementStateManager.saveOwnership(ownerships);
+        assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
+
+        ServerLoad server2 = new ServerLoad("server2");
+        server2.addStream(new StreamLoad("hella-important-stream", 415));
+        ownerships.add(server2);
+        zkPlacementStateManager.saveOwnership(ownerships);
+        assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
+    }
 
-  @Test(timeout = 60000)
-  public void testZkFormatting() throws Exception {
-    final String server = "host/10.0.0.0:31351";
-    final String zkFormattedServer = "host--10.0.0.0:31351";
-    URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
-    ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
-    assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
-    assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
-  }
+    @Test(timeout = 60000)
+    public void testZkFormatting() throws Exception {
+        final String server = "host/10.0.0.0:31351";
+        final String zkFormattedServer = "host--10.0.0.0:31351";
+        URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+        ZKPlacementStateManager zkPlacementStateManager =
+            new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+        assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
+        assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
+    }
 
-  @After
-  public void stopZookeeper() throws IOException {
-    zkTestServer.stop();
-  }
+    @After
+    public void stopZookeeper() throws IOException {
+        zkTestServer.stop();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
index 71e46db..283c290 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
@@ -17,8 +17,13 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -27,13 +32,12 @@ import com.twitter.distributedlog.service.config.StreamConfigProvider;
 import com.twitter.distributedlog.service.streamset.Partition;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.util.Await;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
 /**
  * Test Case for StreamManager.
  */
@@ -53,9 +57,9 @@ public class TestStreamManager {
         StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
         StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
         when(mockStreamFactory.create(
-                (String)any(),
+                (String) any(),
                 (DynamicDistributedLogConfiguration) any(),
-                (StreamManager)any())).thenReturn(mockStream);
+                (StreamManager) any())).thenReturn(mockStream);
         StreamManager streamManager = new StreamManagerImpl(
                 "",
                 new DistributedLogConfiguration(),
@@ -108,7 +112,11 @@ public class TestStreamManager {
         StreamFactory mockStreamFactory = mock(StreamFactory.class);
         StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
         StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
-        when(mockStreamFactory.create((String)any(), (DynamicDistributedLogConfiguration)any(), (StreamManager)any())).thenReturn(mockStream);
+        when(mockStreamFactory.create(
+            (String) any(),
+            (DynamicDistributedLogConfiguration) any(),
+            (StreamManager) any())
+        ).thenReturn(mockStream);
         DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
         ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
index 41b4c69..7e52ff2 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
@@ -17,6 +17,11 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.LogRecord;
@@ -30,37 +35,21 @@ import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Await;
 import com.twitter.util.Future;
+import java.nio.ByteBuffer;
 import org.apache.bookkeeper.feature.SettableFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.zip.CRC32;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
 
 /**
  * Test Case for StreamOps.
  */
 public class TestStreamOp {
 
-    static final Logger logger = LoggerFactory.getLogger(TestStreamOp.class);
-
     @Rule
     public TestName testName = new TestName();
 
-    private final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() {
-        @Override
-        protected CRC32 initialValue() {
-            return new CRC32();
-        }
-    };
-
     private WriteOp getWriteOp() {
         SettableFeature disabledFeature = new SettableFeature("", 0);
         return new WriteOp("test",
@@ -69,7 +58,7 @@ public class TestStreamOp {
             new NullStatsLogger(),
             new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
-            (byte)0,
+            (byte) 0,
             null,
             false,
             disabledFeature,
@@ -90,7 +79,7 @@ public class TestStreamOp {
     @Test(timeout = 60000)
     public void testResponseSucceededThenFailed() throws Exception {
         AsyncLogWriter writer = mock(AsyncLogWriter.class);
-        when(writer.write((LogRecord)any())).thenReturn(Future.value(new DLSN(1,2,3)));
+        when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
         when(writer.getStreamName()).thenReturn("test");
         WriteOp writeOp = getWriteOp();
         writeOp.execute(writer, new Sequencer() {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
index e2fa53b..a65e51a 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
@@ -17,31 +17,33 @@
  */
 package com.twitter.distributedlog.service.stream.limiter;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.config.ConcurrentConstConfiguration;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
+import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
 import com.twitter.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
 import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
-import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
 import com.twitter.distributedlog.limiter.GuavaRateLimiter;
 import com.twitter.distributedlog.limiter.RateLimiter;
 import com.twitter.distributedlog.limiter.RequestLimiter;
-
 import java.util.concurrent.atomic.AtomicInteger;
-
 import org.apache.bookkeeper.feature.SettableFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
 
+/**
+ * Test Case for {@link ServiceRequestLimiter}.
+ */
 public class TestServiceRequestLimiter {
-    static final Logger LOG = LoggerFactory.getLogger(TestServiceRequestLimiter.class);
 
+    /**
+     * Mock Request.
+     */
     class MockRequest {
         int size;
         MockRequest() {
@@ -55,11 +57,17 @@ public class TestServiceRequestLimiter {
         }
     }
 
+    /**
+     * Mock request limiter.
+     */
     class MockRequestLimiter implements RequestLimiter<MockRequest> {
         public void apply(MockRequest request) {
         }
     }
 
+    /**
+     * Counter based limiter.
+     */
     static class CounterLimiter implements RateLimiter {
         final int limit;
         int count;
@@ -78,6 +86,9 @@ public class TestServiceRequestLimiter {
         }
     }
 
+    /**
+     * Mock hard request limiter.
+     */
     class MockHardRequestLimiter implements RequestLimiter<MockRequest> {
 
         RequestLimiter<MockRequest> limiter;
@@ -114,6 +125,9 @@ public class TestServiceRequestLimiter {
         }
     }
 
+    /**
+     * Mock soft request limiter.
+     */
     class MockSoftRequestLimiter implements RequestLimiter<MockRequest> {
 
         RequestLimiter<MockRequest> limiter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
index 2275bd6..af7b7e5 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
@@ -17,12 +17,12 @@
  */
 package com.twitter.distributedlog.service.streamset;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
 /**
- * Test Cases for {@link DelimiterStreamPartitionConverter}
+ * Test Cases for {@link DelimiterStreamPartitionConverter}.
  */
 public class TestDelimiterStreamPartitionConverter {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
index 327faf2..d0c6660 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
@@ -17,10 +17,14 @@
  */
 package com.twitter.distributedlog.service.streamset;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
+/**
+ * Test Case for {@link IdentityStreamPartitionConverter}.
+ */
 public class TestIdentityStreamPartitionConverter {
 
     @Test(timeout = 20000)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
index e87fdc6..529397e 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
@@ -17,12 +17,13 @@
  */
 package com.twitter.distributedlog.service.streamset;
 
-import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
-import static org.junit.Assert.*;
+import org.junit.Test;
 
 /**
- * Test {@link PartitionMap}
+ * Test {@link PartitionMap}.
  */
 public class TestPartitionMap {
 


[4/4] incubator-distributedlog git commit: DL-132: Enable check style for distributedlog service module.

Posted by si...@apache.org.
DL-132: Enable check style for distributedlog service module.

Author: Xi Liu <xi...@gmail.com>

Reviewers: Sijie Guo <si...@apache.org>

Closes #89 from xiliuant/xi/checkstyle_service


Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/1a30b0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/1a30b0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/1a30b0ce

Branch: refs/heads/master
Commit: 1a30b0ceb76f33eda08b611d97c150f45f239a95
Parents: 32a52a9
Author: Xi Liu <xi...@gmail.com>
Authored: Wed Jan 4 00:43:56 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jan 4 00:43:56 2017 -0800

----------------------------------------------------------------------
 .../resources/distributedlog/checkstyle.xml     |   2 +-
 distributedlog-service/pom.xml                  |  33 ++
 .../distributedlog/service/ClientUtils.java     |   3 +
 .../service/DistributedLogCluster.java          |  78 ++---
 .../service/DistributedLogServer.java           |  64 ++--
 .../service/DistributedLogServerApp.java        |  38 ++-
 .../service/DistributedLogServiceImpl.java      |  48 +--
 .../service/FatalErrorHandler.java              |   7 -
 .../distributedlog/service/MonitorService.java  |  29 +-
 .../service/MonitorServiceApp.java              |  16 +-
 .../service/ServerFeatureKeys.java              |   2 +-
 .../distributedlog/service/StatsFilter.java     |   8 +-
 .../service/announcer/Announcer.java            |   2 +-
 .../service/announcer/NOPAnnouncer.java         |   3 +
 .../service/announcer/ServerSetAnnouncer.java   |  10 +-
 .../service/announcer/package-info.java         |  21 ++
 .../service/balancer/Balancer.java              |   5 +
 .../service/balancer/BalancerTool.java          |  42 ++-
 .../service/balancer/ClusterBalancer.java       |  27 +-
 .../balancer/CountBasedStreamChooser.java       |   9 +-
 .../service/balancer/LimitedStreamChooser.java  |  10 +
 .../service/balancer/SimpleBalancer.java        |  17 +-
 .../service/balancer/StreamChooser.java         |   2 +-
 .../service/balancer/StreamMover.java           |   5 +-
 .../service/balancer/StreamMoverImpl.java       |   2 +-
 .../service/balancer/package-info.java          |  21 ++
 .../config/DefaultStreamConfigProvider.java     |  29 +-
 .../service/config/ServerConfiguration.java     | 106 ++++---
 .../config/ServiceStreamConfigProvider.java     |  19 +-
 .../service/config/package-info.java            |  21 ++
 .../distributedlog/service/package-info.java    |  21 ++
 .../service/placement/EqualLoadAppraiser.java   |  26 +-
 .../placement/LeastLoadPlacementPolicy.java     | 311 ++++++++++---------
 .../service/placement/LoadAppraiser.java        |  18 +-
 .../service/placement/PlacementPolicy.java      | 194 ++++++------
 .../placement/PlacementStateManager.java        |  80 +++--
 .../service/placement/ServerLoad.java           | 227 +++++++-------
 .../service/placement/StreamLoad.java           | 141 +++++----
 .../placement/ZKPlacementStateManager.java      | 236 +++++++-------
 .../service/placement/package-info.java         |  21 ++
 .../service/stream/AbstractStreamOp.java        |  23 +-
 .../service/stream/AbstractWriteOp.java         |   9 +-
 .../service/stream/BulkWriteOp.java             |  29 +-
 .../distributedlog/service/stream/DeleteOp.java |   7 +-
 .../service/stream/HeartbeatOp.java             |  11 +-
 .../service/stream/ReleaseOp.java               |   7 +-
 .../distributedlog/service/stream/Stream.java   |   7 +-
 .../service/stream/StreamFactory.java           |   3 +
 .../service/stream/StreamFactoryImpl.java       |   3 +
 .../service/stream/StreamImpl.java              |  34 +-
 .../service/stream/StreamManager.java           |   4 +-
 .../service/stream/StreamManagerImpl.java       |   5 +-
 .../distributedlog/service/stream/StreamOp.java |   2 -
 .../service/stream/StreamOpStats.java           |   2 +-
 .../service/stream/TruncateOp.java              |  11 +-
 .../distributedlog/service/stream/WriteOp.java  |  17 +-
 .../service/stream/WriteOpWithPayload.java      |   3 +
 .../service/stream/admin/AdminOp.java           |   6 +-
 .../service/stream/admin/CreateOp.java          |   7 +-
 .../service/stream/admin/StreamAdminOp.java     |   5 +-
 .../service/stream/admin/package-info.java      |  21 ++
 .../stream/limiter/DynamicRequestLimiter.java   |  21 +-
 .../stream/limiter/RequestLimiterBuilder.java   |  24 +-
 .../stream/limiter/ServiceRequestLimiter.java   |   8 +-
 .../stream/limiter/StreamAcquireLimiter.java    |   6 +-
 .../stream/limiter/StreamRequestLimiter.java    |   5 +-
 .../service/stream/limiter/package-info.java    |  21 ++
 .../service/stream/package-info.java            |  21 ++
 .../CacheableStreamPartitionConverter.java      |   3 +
 .../DelimiterStreamPartitionConverter.java      |   2 +-
 .../service/streamset/Partition.java            |   1 +
 .../service/streamset/PartitionMap.java         |   3 +
 .../service/streamset/package-info.java         |  21 ++
 .../distributedlog/service/tools/ProxyTool.java |  30 +-
 .../service/tools/package-info.java             |  21 ++
 .../service/utils/package-info.java             |  21 ++
 .../stats/CodahaleMetricsServletProvider.java   |   4 +-
 .../HealthCheckServletContextListener.java      |   2 +-
 .../stats/MetricsServletContextListener.java    |   3 +
 .../bookkeeper/stats/ServletReporter.java       |   2 +-
 .../apache/bookkeeper/stats/package-info.java   |  21 ++
 .../client/routing/LocalRoutingService.java     |   7 +-
 .../service/DistributedLogServerTestCase.java   |  33 +-
 .../service/TestDistributedLogServerBase.java   |  62 ++--
 .../TestDistributedLogServerClientRouting.java  |   9 +-
 .../service/TestDistributedLogService.java      |  88 +++---
 .../service/TestRegionUnavailable.java          |  22 +-
 .../distributedlog/service/TestStatsFilter.java |  11 +-
 .../service/balancer/TestBalancerUtils.java     |   8 +-
 .../service/balancer/TestClusterBalancer.java   |  26 +-
 .../balancer/TestCountBasedStreamChooser.java   |  16 +-
 .../service/balancer/TestSimpleBalancer.java    |  22 +-
 .../service/balancer/TestStreamMover.java       |  12 +-
 .../service/config/TestServerConfiguration.java |   8 +-
 .../config/TestStreamConfigProvider.java        |  14 +-
 .../placement/TestLeastLoadPlacementPolicy.java | 247 ++++++++-------
 .../service/placement/TestServerLoad.java       |  50 +--
 .../service/placement/TestStreamLoad.java       |  28 +-
 .../placement/TestZKPlacementStateManager.java  | 197 ++++++------
 .../service/stream/TestStreamManager.java       |  24 +-
 .../service/stream/TestStreamOp.java            |  27 +-
 .../limiter/TestServiceRequestLimiter.java      |  32 +-
 .../TestDelimiterStreamPartitionConverter.java  |   6 +-
 .../TestIdentityStreamPartitionConverter.java   |   8 +-
 .../service/streamset/TestPartitionMap.java     |   7 +-
 105 files changed, 1974 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
----------------------------------------------------------------------
diff --git a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
index e1117c8..db3549f 100644
--- a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
+++ b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
@@ -260,7 +260,7 @@ page at http://checkstyle.sourceforge.net/config.html -->
     T, K, V, W, X or else be capital-case terminated with a T,
     such as MyGenericParameterT -->
     <module name="ClassTypeParameterName">
-      <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
+      <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*))$"/>
       <property name="severity" value="error"/>
     </module>
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index 052ce15..154dedb 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -197,6 +197,39 @@
           </execution>
         </executions>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <dependencies>
+          <dependency>
+            <groupId>com.puppycrawl.tools</groupId>
+            <artifactId>checkstyle</artifactId>
+            <version>6.19</version>
+          </dependency>
+          <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>distributedlog-build-tools</artifactId>
+            <version>${project.version}</version>
+          </dependency>
+        </dependencies>
+        <configuration>
+          <configLocation>distributedlog/checkstyle.xml</configLocation>
+          <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+          <consoleOutput>true</consoleOutput>
+          <failOnViolation>true</failOnViolation>
+          <includeResources>false</includeResources>
+          <includeTestSourceDirectory>true</includeTestSourceDirectory>
+        </configuration>
+        <executions>
+          <execution>
+            <phase>test-compile</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
   </build>
   <profiles>

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
index dd9961d..da36014 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
@@ -21,6 +21,9 @@ import com.twitter.distributedlog.client.DistributedLogClientImpl;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
 import org.apache.commons.lang3.tuple.Pair;
 
+/**
+ * DistributedLog Client Related Utils.
+ */
 public class ClientUtils {
 
     public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index a2a0ca6..029c822 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -25,6 +25,13 @@ import com.twitter.distributedlog.metadata.DLMetadata;
 import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.finagle.builder.Server;
+import java.io.File;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
 import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -35,14 +42,6 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
 /**
  * DistributedLog Cluster is an emulator to run distributedlog components.
  */
@@ -59,18 +58,18 @@ public class DistributedLogCluster {
      */
     public static class Builder {
 
-        int _numBookies = 3;
-        boolean _shouldStartZK = true;
-        String _zkHost = "127.0.0.1";
-        int _zkPort = 0;
-        boolean _shouldStartProxy = true;
-        int _proxyPort = 7000;
-        boolean _thriftmux = false;
-        DistributedLogConfiguration _dlConf = new DistributedLogConfiguration()
+        int numBookies = 3;
+        boolean shouldStartZK = true;
+        String zkHost = "127.0.0.1";
+        int zkPort = 0;
+        boolean shouldStartProxy = true;
+        int proxyPort = 7000;
+        boolean thriftmux = false;
+        DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
                 .setLockTimeout(10)
                 .setOutputBufferSize(0)
                 .setImmediateFlushEnabled(true);
-        ServerConfiguration _bkConf = new ServerConfiguration();
+        ServerConfiguration bkConf = new ServerConfiguration();
 
         private Builder() {}
 
@@ -80,7 +79,7 @@ public class DistributedLogCluster {
          * @return builder
          */
         public Builder numBookies(int numBookies) {
-            this._numBookies = numBookies;
+            this.numBookies = numBookies;
             return this;
         }
 
@@ -92,7 +91,7 @@ public class DistributedLogCluster {
          * @return builder
          */
         public Builder shouldStartZK(boolean startZK) {
-            this._shouldStartZK = startZK;
+            this.shouldStartZK = startZK;
             return this;
         }
 
@@ -104,7 +103,7 @@ public class DistributedLogCluster {
          * @return builder
          */
         public Builder zkServers(String zkServers) {
-            this._zkHost = zkServers;
+            this.zkHost = zkServers;
             return this;
         }
 
@@ -116,7 +115,7 @@ public class DistributedLogCluster {
          * @return builder.
          */
         public Builder zkPort(int zkPort) {
-            this._zkPort = zkPort;
+            this.zkPort = zkPort;
             return this;
         }
 
@@ -128,7 +127,7 @@ public class DistributedLogCluster {
          * @return builder
          */
         public Builder shouldStartProxy(boolean startProxy) {
-            this._shouldStartProxy = startProxy;
+            this.shouldStartProxy = startProxy;
             return this;
         }
 
@@ -140,60 +139,63 @@ public class DistributedLogCluster {
          * @return builder
          */
         public Builder proxyPort(int proxyPort) {
-            this._proxyPort = proxyPort;
+            this.proxyPort = proxyPort;
             return this;
         }
 
         /**
-         * DistributedLog Configuration
+         * Set the distributedlog configuration.
          *
          * @param dlConf
          *          distributedlog configuration
          * @return builder
          */
         public Builder dlConf(DistributedLogConfiguration dlConf) {
-            this._dlConf = dlConf;
+            this.dlConf = dlConf;
             return this;
         }
 
         /**
-         * Bookkeeper server configuration
+         * Set the Bookkeeper server configuration.
          *
          * @param bkConf
          *          bookkeeper server configuration
          * @return builder
          */
         public Builder bkConf(ServerConfiguration bkConf) {
-            this._bkConf = bkConf;
+            this.bkConf = bkConf;
             return this;
         }
 
         /**
-         * Enable thriftmux for the dl server
+         * Enable thriftmux for the dl server.
          *
          * @param enabled flag to enable thriftmux
          * @return builder
          */
         public Builder thriftmux(boolean enabled) {
-            this._thriftmux = enabled;
+            this.thriftmux = enabled;
             return this;
         }
 
         public DistributedLogCluster build() throws Exception {
             // build the cluster
             return new DistributedLogCluster(
-                    _dlConf,
-                    _bkConf,
-                    _numBookies,
-                    _shouldStartZK,
-                    _zkHost,
-                    _zkPort,
-                    _shouldStartProxy,
-                    _proxyPort,
-                    _thriftmux);
+                dlConf,
+                bkConf,
+                numBookies,
+                shouldStartZK,
+                zkHost,
+                zkPort,
+                shouldStartProxy,
+                proxyPort,
+                thriftmux);
         }
     }
 
+    /**
+     * Run a distributedlog proxy server.
+     */
     public static class DLServer {
 
         static final int MAX_RETRIES = 20;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index a9ba125..248bcf7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,32 +17,8 @@
  */
 package com.twitter.distributedlog.service;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Tuple2;
-
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -72,10 +48,33 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter;
 import com.twitter.finagle.thrift.ThriftServerFramedCodec;
 import com.twitter.finagle.transport.Transport;
 import com.twitter.util.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
 
+/**
+ * Running the distributedlog proxy server.
+ */
 public class DistributedLogServer {
 
-    static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
     private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
 
     private DistributedLogServiceImpl dlService = null;
@@ -124,7 +123,8 @@ public class DistributedLogServer {
         this.loadAppraiserClassStr = loadAppraiserClass;
     }
 
-    public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+    public void runServer()
+        throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
         if (!uri.isPresent()) {
             throw new IllegalArgumentException("No distributedlog uri provided.");
         }
@@ -135,7 +135,8 @@ public class DistributedLogServer {
             try {
                 dlConf.loadConf(new File(configFile).toURI().toURL());
             } catch (ConfigurationException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+                    + configFile + ".");
             } catch (MalformedURLException e) {
                 throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
                         + configFile + ".");
@@ -185,7 +186,8 @@ public class DistributedLogServer {
         }
         Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
         LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
-        logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
+        logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get()
+            + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
 
         StreamConfigProvider streamConfProvider =
                 getStreamConfigProvider(dlConf, converter);
@@ -227,7 +229,8 @@ public class DistributedLogServer {
         }
     }
 
-    private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) throws ConfigurationException {
+    private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
+        throws ConfigurationException {
         Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
         if (conf.isPresent()) {
             DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
@@ -341,7 +344,8 @@ public class DistributedLogServer {
             logger.info("Using thriftmux.");
             Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
                     Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
-            serverBuilder = serverBuilder.stack(ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
+            serverBuilder = serverBuilder.stack(
+                ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
         }
 
         logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index 1c3d8d4..55ed84f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -17,15 +17,26 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
 import com.google.common.base.Function;
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.client.routing.RoutingUtils;
 import com.twitter.distributedlog.client.serverset.DLZkServerSet;
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -38,21 +49,14 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
-
+/**
+ * The launcher of the distributedlog proxy server.
+ */
 public class DistributedLogServerApp {
 
-    static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
 
-    private final static String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
+    private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
     private final String[] args;
     private final Options options = new Options();
 
@@ -104,7 +108,8 @@ public class DistributedLogServerApp {
         }
     }
 
-    private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
+    private void runCmd(CommandLine cmdline)
+        throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
         final StatsReceiver statsReceiver = NullStatsReceiver.get();
         Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
         DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -113,7 +118,8 @@ public class DistributedLogServerApp {
             try {
                 dlConf.loadConf(new File(configFile).toURI().toURL());
             } catch (ConfigurationException e) {
-                throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+                throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+                    + configFile + ".");
             } catch (MalformedURLException e) {
                 throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
                         + configFile + ".");
@@ -130,7 +136,7 @@ public class DistributedLogServerApp {
                 }).or(new NullStatsProvider());
 
         final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
-        Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+        checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
         URI dlUri = URI.create(uriOption.get());
 
         DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5dee7fd..db1346e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -47,7 +47,6 @@ import com.twitter.distributedlog.service.placement.PlacementPolicy;
 import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
 import com.twitter.distributedlog.service.stream.BulkWriteOp;
 import com.twitter.distributedlog.service.stream.DeleteOp;
-import com.twitter.distributedlog.service.stream.admin.CreateOp;
 import com.twitter.distributedlog.service.stream.HeartbeatOp;
 import com.twitter.distributedlog.service.stream.ReleaseOp;
 import com.twitter.distributedlog.service.stream.Stream;
@@ -59,8 +58,8 @@ import com.twitter.distributedlog.service.stream.StreamOp;
 import com.twitter.distributedlog.service.stream.StreamOpStats;
 import com.twitter.distributedlog.service.stream.TruncateOp;
 import com.twitter.distributedlog.service.stream.WriteOp;
-import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
 import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
@@ -86,7 +85,6 @@ import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
 import com.twitter.util.ScheduledThreadPoolTimer;
 import com.twitter.util.Timer;
-
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
@@ -96,7 +94,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.Counter;
@@ -105,15 +102,17 @@ import org.apache.bookkeeper.stats.StatsLogger;
 import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.runtime.BoxedUnit;
 
+/**
+ * Implementation of distributedlog thrift service.
+ */
 public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
                                                   FatalErrorHandler {
 
-    static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
+    private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
 
-    private final int MOVING_AVERAGE_WINDOW_SECS = 60;
+    private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
 
     private final ServerConfiguration serverConfig;
     private final DistributedLogConfiguration dlConfig;
@@ -294,8 +293,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
 
             @Override
             public Number getSample() {
-                return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() ?
-                    3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+                return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
+                    ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
             }
         };
         this.movingAvgRpsGauge = new Gauge<Number>() {
@@ -364,8 +363,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
         streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
 
         // Setup complete
-        logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.",
-                new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
+        logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
+            + " dlsn version {}.",
+            new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
     }
 
     private void countStatusCode(StatusCode code) {
@@ -440,7 +440,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     @Override
-    public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
+    public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
+                                                          List<ByteBuffer> data,
+                                                          WriteContext ctx) {
         bulkWritePendingStat.inc();
         receivedRecordCounter.add(data.size());
         BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
@@ -480,8 +482,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
 
     @Override
     public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
-        TruncateOp op = new TruncateOp(stream, DLSN.deserialize(dlsn), statsLogger, perStreamStatsLogger, getChecksum(ctx),
-            featureChecksumDisabled, accessControlManager);
+        TruncateOp op = new TruncateOp(
+            stream,
+            DLSN.deserialize(dlsn),
+            statsLogger,
+            perStreamStatsLogger,
+            getChecksum(ctx),
+            featureChecksumDisabled,
+            accessControlManager);
         executeStreamOp(op);
         return op.result();
     }
@@ -730,14 +738,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
     }
 
     /**
-     * clean up the gauge before we close to help GC
+     * clean up the gauge before we close to help GC.
      */
     private void unregisterGauge(){
-        this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge);
-        this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge);
-        this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge);
-        this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge);
-        this.statsLogger.unregisterGauge("cached",this.streamCachedGauge);
+        this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
+        this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
+        this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
+        this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
+        this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
     }
 
     @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
index e0a15e6..d6922b9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
@@ -17,13 +17,6 @@
  */
 package com.twitter.distributedlog.service;
 
-import com.google.common.base.Optional;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
 /**
  * Implement handling for an unrecoverable error.
  */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 7edb778..4ff5b87 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -17,9 +17,9 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.google.common.base.Preconditions.checkArgument;
 
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
@@ -41,14 +41,6 @@ import com.twitter.finagle.stats.StatsReceiver;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Duration;
 import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.MalformedURLException;
@@ -65,10 +57,19 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * Monitor Service.
+ */
 public class MonitorService implements NamespaceListener {
 
-    static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
+    private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
 
     private DistributedLogNamespace dlNamespace = null;
     private MonitorServiceClient dlClient = null;
@@ -271,9 +272,9 @@ public class MonitorService implements NamespaceListener {
     }
 
     public void runServer() throws IllegalArgumentException, IOException {
-        Preconditions.checkArgument(uriArg.isPresent(),
+        checkArgument(uriArg.isPresent(),
                 "No distributedlog uri provided.");
-        Preconditions.checkArgument(serverSetArg.isPresent(),
+        checkArgument(serverSetArg.isPresent(),
                 "No proxy server set provided.");
         if (intervalArg.isPresent()) {
             interval = intervalArg.get();
@@ -422,7 +423,7 @@ public class MonitorService implements NamespaceListener {
     }
 
     /**
-     * Close the server
+     * Close the server.
      */
     public void close() {
         logger.info("Closing monitor service.");
@@ -459,7 +460,7 @@ public class MonitorService implements NamespaceListener {
     }
 
     /**
-     * clean up the gauge before we close to help GC
+     * clean up the gauge before we close to help GC.
      */
     private void unregisterGauge(){
         statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index a51a6a9..b5b4ca8 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -17,8 +17,13 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
 import com.twitter.finagle.stats.NullStatsReceiver;
 import com.twitter.finagle.stats.StatsReceiver;
+import java.io.IOException;
 import org.apache.bookkeeper.stats.NullStatsProvider;
 import org.apache.bookkeeper.stats.StatsProvider;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -30,15 +35,16 @@ import org.apache.commons.cli.ParseException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
 
+/**
+ * The launcher to run monitor service.
+ */
 public class MonitorServiceApp {
 
-    static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+    private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+
+    static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
 
-    final static String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
     final String[] args;
     final Options options = new Options();
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
index 798dcf5..d779cd0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
@@ -18,7 +18,7 @@
 package com.twitter.distributedlog.service;
 
 /**
- * List of feature keys used by distributedlog server
+ * List of feature keys used by distributedlog server.
  */
 public enum ServerFeatureKeys {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
index 6c570f6..bd0a992 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
@@ -18,17 +18,13 @@
 package com.twitter.distributedlog.service;
 
 import com.google.common.base.Stopwatch;
-
 import com.twitter.finagle.Service;
 import com.twitter.finagle.SimpleFilter;
 import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.StatsLogger;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.StatsLogger;
 
 /**
  * Track distributedlog server finagle-service stats.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
index 89e0665..cb37088 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.announcer;
 import java.io.IOException;
 
 /**
- * Announce service information
+ * Announce service information.
  */
 public interface Announcer {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
index c686408..471f954 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.announcer;
 
 import java.io.IOException;
 
+/**
+ * A no-op implementation of {@link Announcer}.
+ */
 public class NOPAnnouncer implements Announcer {
     @Override
     public void announce() throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
index ada8710..eaf4c26 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
@@ -20,9 +20,6 @@ package com.twitter.distributedlog.service.announcer;
 import com.twitter.common.zookeeper.Group;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
@@ -30,10 +27,15 @@ import java.net.URI;
 import java.net.UnknownHostException;
 import java.util.HashMap;
 import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+/**
+ * ServerSet based announcer.
+ */
 public class ServerSetAnnouncer implements Announcer {
 
-    static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
+    private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
 
     final String localAddr;
     final InetSocketAddress serviceEndpoint;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
new file mode 100644
index 0000000..bca36df
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Announcers to announce servers to server set.
+ */
+package com.twitter.distributedlog.service.announcer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
index 9dd84d0..3ffe54b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
@@ -20,6 +20,11 @@ package com.twitter.distributedlog.service.balancer;
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
 
+/**
+ * Balancer Interface.
+ *
+ * <p>A balancer is used for balance the streams across the proxy cluster.
+ */
 public interface Balancer {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
index 2e49d92..48430df 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
@@ -17,8 +17,9 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.common.zookeeper.ServerSet;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
@@ -33,6 +34,8 @@ import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
@@ -40,15 +43,12 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.URI;
-
 /**
- * Tool to rebalance cluster
+ * Tool to rebalance cluster.
  */
 public class BalancerTool extends Tool {
 
-    static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
+    private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
 
     static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
         return DistributedLogClientBuilder.newBuilder()
@@ -66,6 +66,9 @@ public class BalancerTool extends Tool {
                                 .failFast(false));
     }
 
+    /**
+     * Base Command to run balancer.
+     */
     protected abstract static class BalancerCommand extends OptsCommand {
 
         protected Options options = new Options();
@@ -78,7 +81,8 @@ public class BalancerTool extends Tool {
         BalancerCommand(String name, String description) {
             super(name, description);
             options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
-            options.addOption("rtp", "rebalance-tolerance-percentage", true, "Rebalance tolerance percentage per proxy");
+            options.addOption("rtp", "rebalance-tolerance-percentage", true,
+                "Rebalance tolerance percentage per proxy");
             options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
             options.addOption("r", "rate", true, "Rebalance rate");
         }
@@ -105,11 +109,11 @@ public class BalancerTool extends Tool {
             if (cmdline.hasOption("r")) {
                 this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
             }
-            Preconditions.checkArgument(rebalanceWaterMark >= 0,
+            checkArgument(rebalanceWaterMark >= 0,
                     "Rebalance Water Mark should be a non-negative number");
-            Preconditions.checkArgument(rebalanceTolerancePercentage >= 0.0f,
+            checkArgument(rebalanceTolerancePercentage >= 0.0f,
                     "Rebalance Tolerance Percentage should be a non-negative number");
-            Preconditions.checkArgument(rebalanceConcurrency > 0,
+            checkArgument(rebalanceConcurrency > 0,
                     "Rebalance Concurrency should be a positive number");
             if (null == rate || rate <= 0.0f) {
                 rateLimiter = Optional.absent();
@@ -133,6 +137,9 @@ public class BalancerTool extends Tool {
         protected abstract int executeCommand(CommandLine cmdline) throws Exception;
     }
 
+    /**
+     * Command to balance streams within a cluster.
+     */
     protected static class ClusterBalancerCommand extends BalancerCommand {
 
         protected URI uri;
@@ -188,7 +195,11 @@ public class BalancerTool extends Tool {
                                   ClusterBalancer balancer)
                 throws Exception {
             if (null == source) {
-                balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+                balancer.balance(
+                    rebalanceWaterMark,
+                    rebalanceTolerancePercentage,
+                    rebalanceConcurrency,
+                    getRateLimiter());
             } else {
                 balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
             }
@@ -217,6 +228,9 @@ public class BalancerTool extends Tool {
         }
     }
 
+    /**
+     * Command to balance streams between regions.
+     */
     protected static class RegionBalancerCommand extends BalancerCommand {
 
         protected URI region1;
@@ -288,7 +302,11 @@ public class BalancerTool extends Tool {
 
         protected int runBalancer(SimpleBalancer balancer) throws Exception {
             if (null == source) {
-                balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+                balancer.balance(
+                    rebalanceWaterMark,
+                    rebalanceTolerancePercentage,
+                    rebalanceConcurrency,
+                    getRateLimiter());
             } else {
                 balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
             }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
index 6fef648..3a3dc1f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
@@ -28,10 +28,6 @@ import com.twitter.util.Await;
 import com.twitter.util.Function;
 import com.twitter.util.Future;
 import com.twitter.util.FutureEventListener;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.Serializable;
 import java.net.SocketAddress;
 import java.util.ArrayList;
@@ -42,13 +38,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * A balancer balances ownerships with a cluster of targets
+ * A balancer balances ownerships with a cluster of targets.
  */
 public class ClusterBalancer implements Balancer {
 
-    static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
+    private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
 
     /**
      * Represent a single host. Ordered by number of streams in desc order.
@@ -205,7 +204,8 @@ public class ClusterBalancer implements Balancer {
             }
 
             int moveFromLowWaterMark;
-            int moveToHighWaterMark = Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
+            int moveToHighWaterMark =
+                Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
 
             if (hostIdxMoveFrom >= 0) {
                 moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
@@ -220,7 +220,8 @@ public class ClusterBalancer implements Balancer {
                 AtomicInteger moveFrom = new AtomicInteger(0);
                 AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
                 while (moveFrom.get() < moveTo.get()) {
-                    moveStreams(hosts, moveFrom, moveFromLowWaterMark, moveTo, moveToHighWaterMark, rebalanceRateLimiter);
+                    moveStreams(hosts, moveFrom, moveFromLowWaterMark,
+                        moveTo, moveToHighWaterMark, rebalanceRateLimiter);
                     moveFrom.incrementAndGet();
                 }
             }
@@ -244,8 +245,14 @@ public class ClusterBalancer implements Balancer {
         }
 
         if (logger.isDebugEnabled()) {
-            logger.debug("Moving streams : hosts = {}, from = {}, to = {} : from_low_water_mark = {}, to_high_water_mark = {}",
-                         new Object[] { hosts, hostIdxMoveFrom.get(), hostIdxMoveTo.get(), moveFromLowWaterMark, moveToHighWaterMark });
+            logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
+                + " from_low_water_mark = {}, to_high_water_mark = {}",
+                new Object[] {
+                    hosts,
+                    hostIdxMoveFrom.get(),
+                    hostIdxMoveTo.get(),
+                    moveFromLowWaterMark,
+                    moveToHighWaterMark });
         }
 
         Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
index 0a267ea..fab37b3 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
@@ -17,8 +17,7 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.tuple.Pair;
+import static com.google.common.base.Preconditions.checkArgument;
 
 import java.io.Serializable;
 import java.net.SocketAddress;
@@ -29,7 +28,11 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
 
+/**
+ * A stream chooser based on number of streams.
+ */
 class CountBasedStreamChooser implements StreamChooser, Serializable,
         Comparator<Pair<SocketAddress, LinkedList<String>>> {
 
@@ -46,7 +49,7 @@ class CountBasedStreamChooser implements StreamChooser, Serializable,
     int next;
 
     CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
-        Preconditions.checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
+        checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
         streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
         for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
             LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
index d0e294d..069e596 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
@@ -17,8 +17,18 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+/**
+ * A stream chooser that can only choose limited number of streams.
+ */
 public class LimitedStreamChooser implements StreamChooser {
 
+  /**
+   * Create a limited stream chooser by {@code limit}.
+   *
+   * @param underlying the underlying stream chooser.
+   * @param limit the limit of number of streams to choose.
+   * @return the limited stream chooser.
+   */
     public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
         return new LimitedStreamChooser(underlying, limit);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
index 6913e4a..b205d5f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
@@ -21,9 +21,6 @@ import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
 import com.twitter.distributedlog.service.DistributedLogClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.Map;
@@ -31,13 +28,15 @@ import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A balancer balances ownerships between two targets.
  */
 public class SimpleBalancer implements Balancer {
 
-    static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
+    private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
 
     protected final String target1;
     protected final String target2;
@@ -120,8 +119,8 @@ public class SimpleBalancer implements Balancer {
         loadDistribution.put(target, targetStreamCount);
 
         // Calculate how many streams to be rebalanced from src region to target region
-        int numStreamsToRebalance =
-                BalancerUtils.calculateNumStreamsToRebalance(source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
+        int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(
+            source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
 
         if (numStreamsToRebalance <= 0) {
             logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target);
@@ -130,7 +129,8 @@ public class SimpleBalancer implements Balancer {
 
         StreamChooser streamChooser =
                 LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
-        StreamMover streamMover = new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
+        StreamMover streamMover =
+            new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
 
         moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
     }
@@ -166,7 +166,8 @@ public class SimpleBalancer implements Balancer {
         }
 
         StreamChooser streamChooser = new CountBasedStreamChooser(distribution);
-        StreamMover streamMover = new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
+        StreamMover streamMover =
+            new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
 
         moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
     }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
index e0fcaf1..d92aef0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
@@ -18,7 +18,7 @@
 package com.twitter.distributedlog.service.balancer;
 
 /**
- * Choose a stream to rebalance
+ * Choose a stream to rebalance.
  */
 public interface StreamChooser {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
index 6857560..6e4205b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
@@ -17,10 +17,13 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+/**
+ * A stream mover to move streams between proxies.
+ */
 public interface StreamMover {
 
     /**
-     * Move given stream <i>streamName</i>
+     * Move given stream <i>streamName</i>.
      *
      * @param streamName
      *          stream name to move

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
index 75df4ea..fc67fb2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
@@ -48,7 +48,7 @@ public class StreamMoverImpl implements StreamMover {
     }
 
     /**
-     * Move given stream <i>streamName</i>
+     * Move given stream <i>streamName</i>.
      *
      * @param streamName
      *          stream name to move

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
new file mode 100644
index 0000000..4ae8d44
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Balancer to move streams around to balance the traffic.
+ */
+package com.twitter.distributedlog.service.balancer;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
index d612195..b45b798 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -25,34 +25,41 @@ import com.twitter.distributedlog.config.ConfigurationSubscription;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.config.FileConfigurationBuilder;
 import com.twitter.distributedlog.config.PropertiesConfigurationBuilder;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.net.MalformedURLException;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * For all streams return the same dynamic config based on configFile.
  */
 public class DefaultStreamConfigProvider implements StreamConfigProvider {
-    static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
 
     private final Optional<DynamicDistributedLogConfiguration> dynConf;
     private final ConfigurationSubscription confSub;
 
-    public DefaultStreamConfigProvider(String configFilePath, ScheduledExecutorService executorService, int reloadPeriod,
-                                       TimeUnit reloadUnit) throws ConfigurationException {
+    public DefaultStreamConfigProvider(String configFilePath,
+                                       ScheduledExecutorService executorService,
+                                       int reloadPeriod,
+                                       TimeUnit reloadUnit)
+        throws ConfigurationException {
         try {
             File configFile = new File(configFilePath);
-            FileConfigurationBuilder properties = new PropertiesConfigurationBuilder(configFile.toURI().toURL());
-            ConcurrentConstConfiguration defaultConf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
-            DynamicDistributedLogConfiguration conf = new DynamicDistributedLogConfiguration(defaultConf);
+            FileConfigurationBuilder properties =
+                new PropertiesConfigurationBuilder(configFile.toURI().toURL());
+            ConcurrentConstConfiguration defaultConf =
+                new ConcurrentConstConfiguration(new DistributedLogConfiguration());
+            DynamicDistributedLogConfiguration conf =
+                new DynamicDistributedLogConfiguration(defaultConf);
             List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties);
-            confSub = new ConfigurationSubscription(conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
+            confSub = new ConfigurationSubscription(
+                conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
             this.dynConf = Optional.of(conf);
         } catch (MalformedURLException ex) {
             throw new ConfigurationException(ex);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 5b19f6c..b3b4c4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -17,7 +17,8 @@
  */
 package com.twitter.distributedlog.service.config;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogConstants;
@@ -29,7 +30,7 @@ import org.apache.commons.configuration.ConfigurationException;
 import org.apache.commons.configuration.SystemConfiguration;
 
 /**
- * Configuration for DistributedLog Server
+ * Configuration for DistributedLog Server.
  */
 public class ServerConfiguration extends CompositeConfiguration {
 
@@ -43,36 +44,36 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     // Server DLSN version
-    protected final static String SERVER_DLSN_VERSION = "server_dlsn_version";
-    protected final static byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
+    protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
+    protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
 
     // Server Durable Write Enable/Disable Flag
-    protected final static String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
-    protected final static boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
+    protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
+    protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
 
     // Server Region Id
-    protected final static String SERVER_REGION_ID = "server_region_id";
-    protected final static int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
+    protected static final String SERVER_REGION_ID = "server_region_id";
+    protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
 
     // Server Port
-    protected final static String SERVER_PORT = "server_port";
-    protected final static int SERVER_PORT_DEFAULT = 0;
+    protected static final String SERVER_PORT = "server_port";
+    protected static final int SERVER_PORT_DEFAULT = 0;
 
     // Server Shard Id
-    protected final static String SERVER_SHARD_ID = "server_shard";
-    protected final static int SERVER_SHARD_ID_DEFAULT = -1;
+    protected static final String SERVER_SHARD_ID = "server_shard";
+    protected static final int SERVER_SHARD_ID_DEFAULT = -1;
 
     // Server Threads
-    protected final static String SERVER_NUM_THREADS = "server_threads";
-    protected final static int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
+    protected static final String SERVER_NUM_THREADS = "server_threads";
+    protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
 
     // Server enable per stream stat
-    protected final static String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
-    protected final static boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
+    protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
+    protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
 
     // Server graceful shutdown period (in millis)
-    protected final static String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
-    protected final static long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
+    protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
+    protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
 
     // Server service timeout
     public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms";
@@ -86,17 +87,18 @@ public class ServerConfiguration extends CompositeConfiguration {
     // Server stream probation timeout
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
     public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
-    public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60*1000*5;
+    public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5;
 
     // Server stream to partition converter
-    protected final static String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
+    protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
 
     // Use hostname as the allocator pool name
-    protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
-        = "server_use_hostname_as_allocator_pool_name";
+    protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
+        "server_use_hostname_as_allocator_pool_name";
     protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
     //Configure refresh interval for calculating resource placement in seconds
-    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+    public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
+        "server_resource_placement_refresh_interval_sec";
     public static final int  SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
 
     public ServerConfiguration() {
@@ -105,7 +107,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Load configurations from {@link DistributedLogConfiguration}
+     * Load configurations from {@link DistributedLogConfiguration}.
      *
      * @param dlConf
      *          distributedlog configuration
@@ -137,7 +139,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Set the flag to enable/disable durable write
+     * Set the flag to enable/disable durable write.
      *
      * @param enabled
      *          flag to enable/disable durable write
@@ -149,7 +151,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Is durable write enabled?
+     * Is durable write enabled.
      *
      * @return true if waiting writes to be durable. otherwise false.
      */
@@ -158,7 +160,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Set the region id used to instantiate DistributedLogNamespace
+     * Set the region id used to instantiate DistributedLogNamespace.
      *
      * @param regionId
      *          region id
@@ -170,8 +172,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get the region id used to instantiate
-     * {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}
+     * Get the region id used to instantiate {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}.
      *
      * @return region id used to instantiate DistributedLogNamespace
      */
@@ -213,8 +214,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get the shard id of this server. It would be used to instantiate the client id
-     * used for DistributedLogNamespace.
+     * Get the shard id of this server.
+     *
+     * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
      *
      * @return shard id of this server.
      */
@@ -286,7 +288,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get timeout for stream op execution in proxy layer. 0 disables timeout.
+     * Get timeout for stream op execution in proxy layer.
+     *
+     * <p>0 disables timeout.
      *
      * @return timeout for stream operation in proxy layer.
      */
@@ -296,7 +300,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Set timeout for stream op execution in proxy layer. 0 disables timeout.
+     * Set timeout for stream op execution in proxy layer.
+     *
+     * <p>0 disables timeout.
      *
      * @param timeoutMs
      *          timeout for stream operation in proxy layer.
@@ -308,7 +314,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get timeout for closing writer in proxy layer. 0 disables timeout.
+     * Get timeout for closing writer in proxy layer.
+     *
+     * <p>0 disables timeout.
      *
      * @return timeout for closing writer in proxy layer.
      */
@@ -317,7 +325,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Set timeout for closing writer in proxy layer. 0 disables timeout.
+     * Set timeout for closing writer in proxy layer.
+     *
+     * <p>0 disables timeout.
      *
      * @param timeoutMs
      *          timeout for closing writer in proxy layer.
@@ -329,8 +339,9 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * After service timeout, how long should stream be kept in cache in probationary state in order
-     * to prevent reacquire. In millisec.
+     * How long should stream be kept in cache in probationary state after service timeout.
+     *
+     * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
      *
      * @return stream probation timeout in ms.
      */
@@ -340,10 +351,12 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * After service timeout, how long should stream be kept in cache in probationary state in order
-     * to prevent reacquire. In millisec.
+     * How long should stream be kept in cache in probationary state after service timeout.
+     *
+     * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
      *
      * @param timeoutMs probation timeout in ms.
+     * @return server configuration
      */
     public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
         setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
@@ -357,7 +370,8 @@ public class ServerConfiguration extends CompositeConfiguration {
      *          stream partition converter class
      * @return server configuration
      */
-    public ServerConfiguration setStreamPartitionConverterClass(Class<? extends StreamPartitionConverter> converterClass) {
+    public ServerConfiguration setStreamPartitionConverterClass(
+        Class<? extends StreamPartitionConverter> converterClass) {
         setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName());
         return this;
     }
@@ -391,7 +405,7 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Get if use hostname as the allocator pool name
+     * Get if use hostname as the allocator pool name.
      *
      * @return true if use hostname as the allocator pool name. otherwise, use
      * {@link #getServerShardId()} as the allocator pool name.
@@ -412,15 +426,17 @@ public class ServerConfiguration extends CompositeConfiguration {
     }
 
     /**
-     * Validate the configuration
+     * Validate the configuration.
+     *
+     * @throws IllegalStateException when there are any invalid settings.
      */
     public void validate() {
         byte dlsnVersion = getDlsnVersion();
-        Preconditions.checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
+        checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
                 "Unknown dlsn version " + dlsnVersion);
-        Preconditions.checkArgument(getServerThreads() > 0,
+        checkArgument(getServerThreads() > 0,
                 "Invalid number of server threads : " + getServerThreads());
-        Preconditions.checkArgument(getServerShardId() >= 0,
+        checkArgument(getServerShardId() >= 0,
                 "Invalid server shard id : " + getServerShardId());
     }
 



[2/4] incubator-distributedlog git commit: DL-132: Enable check style for distributedlog service module.

Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 36904fd..55c1b48 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -20,11 +20,11 @@ package com.twitter.distributedlog.service.stream;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.DistributedLogManager;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
@@ -51,6 +51,12 @@ import com.twitter.util.FutureEventListener;
 import com.twitter.util.Promise;
 import com.twitter.util.TimeoutException;
 import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.stats.Counter;
@@ -65,20 +71,17 @@ import org.slf4j.LoggerFactory;
 import scala.runtime.AbstractFunction1;
 import scala.runtime.BoxedUnit;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+/**
+ * Implementation of {@link Stream}.
+ */
 public class StreamImpl implements Stream {
-    static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
 
     /**
      * The status of the stream.
      *
-     * The status change of the stream should just go in one direction. If a stream hits
+     * <p>The status change of the stream should just go in one direction. If a stream hits
      * any error, the stream should be put in error state. If a stream is in error state,
      * it should be removed and not reused anymore.
      */
@@ -405,7 +408,7 @@ public class StreamImpl implements Stream {
                     // Stream is closed, fail the op immediately
                     op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
                     return;
-                } if (StreamStatus.INITIALIZED == status) {
+                } else if (StreamStatus.INITIALIZED == status) {
                     completeOpNow = true;
                     success = true;
                 } else if (failFastOnStreamNotReady) {
@@ -551,7 +554,8 @@ public class StreamImpl implements Stream {
     Future<Boolean> acquireStream() {
         final Stopwatch stopwatch = Stopwatch.createStarted();
         final Promise<Boolean> acquirePromise = new Promise<Boolean>();
-        manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+        manager.openAsyncLogWriter().addEventListener(
+            FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
 
             @Override
             public void onSuccess(AsyncLogWriter w) {
@@ -748,8 +752,8 @@ public class StreamImpl implements Stream {
         final boolean abort;
         closeLock.writeLock().lock();
         try {
-            if (StreamStatus.CLOSING == status ||
-                StreamStatus.CLOSED == status) {
+            if (StreamStatus.CLOSING == status
+                || StreamStatus.CLOSED == status) {
                 return closePromise;
             }
             logger.info("Request to close stream {} : {}", getStreamName(), reason);
@@ -875,7 +879,7 @@ public class StreamImpl implements Stream {
     }
 
     /**
-     * clean up the gauge to help GC
+     * clean up the gauge to help GC.
      */
     private void unregisterGauge(){
         streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index e171e46..7f7d44e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -26,9 +26,9 @@ import java.util.Map;
 /**
  * Manage lifecycle of streams.
  *
- * StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
+ * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
  *
- * Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
+ * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
  * per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter,
  * managing stream lock, interpreting exceptions, error conditions, and etc.
  */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index df336fe..8b36d3b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -55,13 +55,14 @@ import org.slf4j.LoggerFactory;
  * StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track
  * of Streams.
  *
- * StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
+ * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
  * a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply
  * creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl
  * constructor.
  */
 public class StreamManagerImpl implements StreamManager {
-    static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
 
     private final ConcurrentHashMap<String, Stream> streams =
         new ConcurrentHashMap<String, Stream>();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
index de01f9f..a2cbc80 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
@@ -24,8 +24,6 @@ import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
 
-import java.nio.ByteBuffer;
-
 /**
  * An operation applied to a stream.
  */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index bfbc88c..c1019c6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -17,8 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.stats.BroadCastStatsLogger;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
index f453dc2..b0b4df2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
@@ -19,26 +19,27 @@ package com.twitter.distributedlog.service.stream;
 
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.acl.AccessControlManager;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to truncate a log stream.
+ */
 public class TruncateOp extends AbstractWriteOp {
 
-    static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
+    private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
 
     private final Counter deniedTruncateCounter;
     private final DLSN dlsn;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
index e9f2f4e..69739dc 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
@@ -20,35 +20,36 @@ package com.twitter.distributedlog.service.stream;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.streamset.Partition;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.FutureEventListener;
 import com.twitter.util.Future;
-
+import com.twitter.util.FutureEventListener;
 import java.nio.ByteBuffer;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to write a single record to a log stream.
+ */
 public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
-    static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
 
     private final byte[] payload;
     private final boolean isRecordSet;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
index eef4811..6cc9063 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
@@ -17,6 +17,9 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+/**
+ * A write operation with payload.
+ */
 public interface WriteOpWithPayload {
 
     // Return the payload size in bytes

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
index 7ac4986..6d2d2ea 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
@@ -21,9 +21,9 @@ import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.util.Future;
 
 /**
- * An admin operation
+ * Admin operation interface.
  */
-public interface AdminOp<Response> {
+public interface AdminOp<RespT> {
 
     /**
      * Invoked before the stream op is executed.
@@ -35,6 +35,6 @@ public interface AdminOp<Response> {
      *
      * @return the future represents the response of the operation
      */
-    Future<Response> execute();
+    Future<RespT> execute();
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
index 2e1f490..478201e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
@@ -17,6 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream.admin;
 
+import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.service.stream.StreamManager;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
@@ -25,8 +27,9 @@ import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.StatsLogger;
 import scala.runtime.AbstractFunction1;
 
-import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
-
+/**
+ * Operation to create log stream.
+ */
 public class CreateOp extends StreamAdminOp {
 
   public CreateOp(String stream,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
index 37c6e14..4fad542 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -26,13 +26,12 @@ import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.util.Future;
 import com.twitter.util.FutureTransformer;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 
-import java.util.concurrent.TimeUnit;
-
 /**
- * Stream admin op
+ * Stream admin op.
  */
 public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
new file mode 100644
index 0000000..5ec997c
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Admin Operations.
+ */
+package com.twitter.distributedlog.service.stream.admin;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
index be8c457..d684de5 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
@@ -20,32 +20,31 @@ package com.twitter.distributedlog.service.stream.limiter;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.limiter.RequestLimiter;
-
 import java.io.Closeable;
-
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.commons.configuration.event.ConfigurationEvent;
 import org.apache.commons.configuration.event.ConfigurationListener;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Dynamically rebuild a rate limiter when the supplied dynamic config changes. Subclasses
- * implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
+ * Dynamically rebuild a rate limiter when the supplied dynamic config changes.
+ *
+ * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
  * the config listener.
  */
-public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<Request>, Closeable {
-    static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
+public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable {
+    private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
 
     private final ConfigurationListener listener;
     private final Feature rateLimitDisabledFeature;
-    volatile RequestLimiter<Request> limiter;
+    volatile RequestLimiter<Req> limiter;
     final DynamicDistributedLogConfiguration dynConf;
 
     public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf,
-                                 StatsLogger statsLogger, Feature rateLimitDisabledFeature) {
+                                 StatsLogger statsLogger,
+                                 Feature rateLimitDisabledFeature) {
         final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic");
         this.dynConf = dynConf;
         this.rateLimitDisabledFeature = rateLimitDisabledFeature;
@@ -74,7 +73,7 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R
     }
 
     @Override
-    public void apply(Request request) throws OverCapacityException {
+    public void apply(Req request) throws OverCapacityException {
         if (rateLimitDisabledFeature.isAvailable()) {
             return;
         }
@@ -91,5 +90,5 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R
     * Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed.
     * This may be called multiple times so the method should be cheap.
     */
-    protected abstract RequestLimiter<Request> build();
+    protected abstract RequestLimiter<Req> build();
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
index 157b1ec..c1a37bb 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
@@ -17,26 +17,32 @@
  */
 package com.twitter.distributedlog.service.stream.limiter;
 
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import com.twitter.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import com.twitter.distributedlog.limiter.GuavaRateLimiter;
 import com.twitter.distributedlog.limiter.RateLimiter;
 import com.twitter.distributedlog.limiter.RequestLimiter;
 import com.twitter.distributedlog.service.stream.StreamOp;
 import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
-
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 
+/**
+ * Request limiter builder.
+ */
 public class RequestLimiterBuilder {
     private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION;
     private RateLimiter limiter;
     private CostFunction<StreamOp> costFunction;
     private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
 
+    /**
+     * Function to calculate the `RPS` (Request per second) cost of a given stream operation.
+     */
     public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() {
         @Override
         public int apply(StreamOp op) {
@@ -48,6 +54,9 @@ public class RequestLimiterBuilder {
         }
     };
 
+    /**
+     * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation.
+     */
     public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() {
         @Override
         public int apply(StreamOp op) {
@@ -60,6 +69,9 @@ public class RequestLimiterBuilder {
         }
     };
 
+    /**
+     * Function to check if a stream operation will cause {@link OverCapacityException}.
+     */
     public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() {
         @Override
         public void apply(StreamOp op) throws OverCapacityException {
@@ -96,9 +108,9 @@ public class RequestLimiterBuilder {
     }
 
     public RequestLimiter<StreamOp> build() {
-        Preconditions.checkNotNull(limiter);
-        Preconditions.checkNotNull(overlimitFunction);
-        Preconditions.checkNotNull(costFunction);
+        checkNotNull(limiter);
+        checkNotNull(overlimitFunction);
+        checkNotNull(costFunction);
         return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index 69a8470..a3e1efb 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -22,9 +22,9 @@ import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
 import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import com.twitter.distributedlog.limiter.RequestLimiter;
+import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.service.stream.StreamManager;
 import com.twitter.distributedlog.service.stream.StreamOp;
-import com.twitter.distributedlog.rate.MovingAverageRate;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.StatsLogger;
 
@@ -89,8 +89,10 @@ public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> {
             .limit(bpsSoftServiceLimit);
 
         ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
-        builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
-        builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
+        builder.addLimiter(new StreamAcquireLimiter(
+            streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
+        builder.addLimiter(new StreamAcquireLimiter(
+            streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
         builder.addLimiter(bpsHardLimiterBuilder.build());
         builder.addLimiter(bpsSoftLimiterBuilder.build());
         builder.addLimiter(rpsHardLimiterBuilder.build());

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
index a417e81..5015751 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -20,13 +20,15 @@ package com.twitter.distributedlog.service.stream.limiter;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
 import com.twitter.distributedlog.exceptions.TooManyStreamsException;
 import com.twitter.distributedlog.limiter.RequestLimiter;
+import com.twitter.distributedlog.rate.MovingAverageRate;
 import com.twitter.distributedlog.service.stream.StreamManager;
 import com.twitter.distributedlog.service.stream.StreamOp;
-import com.twitter.distributedlog.rate.MovingAverageRate;
-
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
 
+/**
+ * A special limiter on limiting acquiring new streams.
+ */
 public class StreamAcquireLimiter implements RequestLimiter<StreamOp> {
     private final StreamManager streamManager;
     private final MovingAverageRate serviceRps;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
index b4836d1..fa601d1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -19,13 +19,16 @@ package com.twitter.distributedlog.service.stream.limiter;
 
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.exceptions.OverCapacityException;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
 import com.twitter.distributedlog.limiter.RequestLimiter;
 import com.twitter.distributedlog.service.stream.StreamOp;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.StatsLogger;
 
+/**
+ * A dynamic request limiter on limiting stream operations.
+ */
 public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> {
     private final DynamicDistributedLogConfiguration dynConf;
     private final StatsLogger limiterStatLogger;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
new file mode 100644
index 0000000..533c75a
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Request Rate Limiting.
+ */
+package com.twitter.distributedlog.service.stream.limiter;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
new file mode 100644
index 0000000..389acd9
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Operations.
+ */
+package com.twitter.distributedlog.service.stream;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
index de35fc2..9afd234 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
@@ -20,6 +20,9 @@ package com.twitter.distributedlog.service.streamset;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+/**
+ * A stream-to-partition converter that caches the mapping between stream and partitions.
+ */
 public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter {
 
     private final ConcurrentMap<String, Partition> partitions;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
index 4576560..d69a393 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.streamset;
 import org.apache.commons.lang3.StringUtils;
 
 /**
- * Stream Partition Converter
+ * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter.
  */
 public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
index d199f88..770c631 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
@@ -22,6 +22,7 @@ import com.google.common.base.Objects;
 /**
  * `Partition` defines the relationship between a `virtual` stream and a
  * physical DL stream.
+ *
  * <p>A `virtual` stream could be partitioned into multiple partitions
  * and each partition is effectively a DL stream.
  */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
index 9c882a6..1962e5f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
@@ -22,6 +22,9 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * A mapping between a logical stream and a set of physical partitions.
+ */
 public class PartitionMap {
 
     private final Map<String, Set<Partition>> partitionMap;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
new file mode 100644
index 0000000..3888e40
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * StreamSet - A logical set of streams.
+ */
+package com.twitter.distributedlog.service.streamset;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
index b37de10..8ff2f26 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
@@ -30,6 +30,10 @@ import com.twitter.finagle.builder.ClientBuilder;
 import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Await;
 import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
@@ -37,18 +41,16 @@ import org.apache.commons.lang3.tuple.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * Tools to interact with proxies.
  */
 public class ProxyTool extends Tool {
 
-    static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
+    private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
 
+    /**
+     * Abstract Cluster level command.
+     */
     protected abstract static class ClusterCommand extends OptsCommand {
 
         protected Options options = new Options();
@@ -59,8 +61,8 @@ public class ProxyTool extends Tool {
             super(name, description);
             options.addOption("u", "uri", true, "DistributedLog URI");
             options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
-            options.addOption("e", "expression", true, "Expression to generate stream suffix. " +
-                    "Currently we support range '0-9', list '1,2,3' and name '143'");
+            options.addOption("e", "expression", true, "Expression to generate stream suffix. "
+                + "Currently we support range '0-9', list '1,2,3' and name '143'");
         }
 
         @Override
@@ -157,6 +159,9 @@ public class ProxyTool extends Tool {
         }
     }
 
+    /**
+     * Command to release ownership of a log stream.
+     */
     static class ReleaseCommand extends ClusterCommand {
 
         double rate = 100f;
@@ -196,6 +201,9 @@ public class ProxyTool extends Tool {
         }
     }
 
+    /**
+     * Command to truncate a log stream.
+     */
     static class TruncateCommand extends ClusterCommand {
 
         DLSN dlsn = DLSN.InitialDLSN;
@@ -234,6 +242,9 @@ public class ProxyTool extends Tool {
         }
     }
 
+    /**
+     * Abstract command to operate on a single proxy server.
+     */
     protected abstract static class ProxyCommand extends OptsCommand {
 
         protected Options options = new Options();
@@ -291,6 +302,9 @@ public class ProxyTool extends Tool {
         protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
     }
 
+    /**
+     * Command to enable/disable accepting new streams.
+     */
     static class AcceptNewStreamCommand extends ProxyCommand {
 
         boolean enabled = false;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
new file mode 100644
index 0000000..1e32fd3
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Service related tools.
+ */
+package com.twitter.distributedlog.service.tools;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
new file mode 100644
index 0000000..e6dcec6
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities used by proxy servers.
+ */
+package com.twitter.distributedlog.service.utils;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
index c08f0f0..8db3e90 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
@@ -23,11 +23,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Extend the codahale metrics provider to run servlets
+ * Extend the codahale metrics provider to run servlets.
  */
 public class CodahaleMetricsServletProvider extends CodahaleMetricsProvider {
 
-    private final static Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class);
+    private static final Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class);
 
     ServletReporter servletReporter = null;
     private final HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry();

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
index 98cd8b3..348787a 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
@@ -21,7 +21,7 @@ import com.codahale.metrics.health.HealthCheckRegistry;
 import com.codahale.metrics.servlets.HealthCheckServlet;
 
 /**
- * Health Check Servlet Listener
+ * Health Check Servlet Listener.
  */
 public class HealthCheckServletContextListener extends HealthCheckServlet.ContextListener {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
index e401ac0..15279fe 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
@@ -20,6 +20,9 @@ package org.apache.bookkeeper.stats;
 import com.codahale.metrics.MetricRegistry;
 import com.codahale.metrics.servlets.MetricsServlet;
 
+/**
+ * A servlet to report metrics.
+ */
 public class MetricsServletContextListener extends MetricsServlet.ContextListener {
 
     private final MetricRegistry metricRegistry;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
index 9cf0610..267f75a 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
@@ -25,7 +25,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 
 /**
- * Starts a jetty server on a configurable port to export stats
+ * Starts a jetty server on a configurable port to export stats.
  */
 public class ServletReporter {
 

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
new file mode 100644
index 0000000..d00b64d
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}.
+ */
+package org.apache.bookkeeper.stats;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
index 10941ba..922e901 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
@@ -20,7 +20,6 @@ package com.twitter.distributedlog.client.routing;
 import com.google.common.collect.Sets;
 import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.finagle.stats.StatsReceiver;
-
 import java.net.SocketAddress;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
@@ -28,12 +27,18 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CopyOnWriteArrayList;
 
+/**
+ * A local routing service that used for testing.
+ */
 public class LocalRoutingService implements RoutingService {
 
     public static Builder newBuilder() {
         return new Builder();
     }
 
+    /**
+     * Builder to build a local routing service for testing.
+     */
     public static class Builder implements RoutingService.Builder {
 
         private Builder() {}

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
index 4d29f21..f7e81dc 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
@@ -17,39 +17,38 @@
  */
 package com.twitter.distributedlog.service;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import com.google.common.base.Optional;
 import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.client.DistributedLogClientImpl;
 import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
 import com.twitter.distributedlog.client.routing.LocalRoutingService;
 import com.twitter.distributedlog.client.routing.RegionsRoutingService;
 import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
-import com.twitter.distributedlog.service.stream.StreamManagerImpl;
 import com.twitter.distributedlog.service.stream.StreamManager;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.distributedlog.service.stream.StreamManagerImpl;
 import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
 import com.twitter.util.Duration;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.SocketAddress;
 import java.net.URI;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
 
-import static org.junit.Assert.*;
-
+/**
+ * Base test case for distributedlog servers.
+ */
 public abstract class DistributedLogServerTestCase {
 
-    protected static final Logger LOG = LoggerFactory.getLogger(DistributedLogServerTestCase.class);
-
     protected static DistributedLogConfiguration conf =
             new DistributedLogConfiguration().setLockTimeout(10)
                     .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
@@ -59,6 +58,9 @@ public abstract class DistributedLogServerTestCase {
     protected static DistributedLogCluster dlCluster;
     protected static DistributedLogCluster noAdHocCluster;
 
+    /**
+     * A distributedlog client wrapper for testing.
+     */
     protected static class DLClient {
         public final LocalRoutingService routingService;
         public DistributedLogClientBuilder dlClientBuilder;
@@ -94,6 +96,9 @@ public abstract class DistributedLogServerTestCase {
         }
     }
 
+    /**
+     * A distributedlog client wrapper that talks to two regions.
+     */
     protected static class TwoRegionDLClient {
 
         public final LocalRoutingService localRoutingService;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
index 218ea06..24d7f07 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
@@ -17,22 +17,30 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import com.google.common.base.Optional;
 import com.twitter.distributedlog.AsyncLogReader;
 import com.twitter.distributedlog.DLMTestUtil;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
 import com.twitter.distributedlog.LogReader;
 import com.twitter.distributedlog.LogRecord;
 import com.twitter.distributedlog.LogRecordWithDLSN;
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
 import com.twitter.distributedlog.ZooKeeperClient;
 import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.impl.acl.ZKAccessControl;
+import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
 import com.twitter.distributedlog.client.routing.LocalRoutingService;
 import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
 import com.twitter.distributedlog.impl.metadata.BKDLConfig;
 import com.twitter.distributedlog.namespace.DistributedLogNamespace;
 import com.twitter.distributedlog.service.stream.StreamManagerImpl;
@@ -49,13 +57,6 @@ import com.twitter.util.Await;
 import com.twitter.util.Duration;
 import com.twitter.util.Future;
 import com.twitter.util.Futures;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -64,17 +65,19 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import static com.google.common.base.Charsets.UTF_8;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+/**
+ * Test Case for {@link DistributedLogServer}.
+ */
 public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
-    static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
 
     @Rule
     public TestName testName = new TestName();
@@ -84,7 +87,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
     }
 
     /**
-     * {@link https://issues.apache.org/jira/browse/DL-27}
+     * {@link https://issues.apache.org/jira/browse/DL-27}.
      */
     @DistributedLogAnnotations.FlakyTest
     @Ignore
@@ -212,10 +215,11 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         writes.add(null);
 
         try {
-            List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes);
+            dlClient.dlClient.writeBulk(name, writes);
             fail("should not have succeeded");
         } catch (NullPointerException npe) {
-            ; // expected
+            // expected
+            logger.info("Expected to catch NullPointException.");
         }
     }
 
@@ -246,7 +250,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         for (int i = start; i < finish; i++) {
             Future<DLSN> future = futures.get(i);
             try {
-                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+                Await.result(future, Duration.fromSeconds(10));
                 fail("future should have failed!");
             } catch (DLException cre) {
                 ++failed;
@@ -259,7 +263,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
 
     void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
         try {
-            DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+            Await.result(future, Duration.fromSeconds(10));
             fail("should have failed");
         } catch (DLException dle) {
             assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
@@ -276,7 +280,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
 
         final int writeCount = 100;
 
-        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1);
+        List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
         for (long i = 1; i <= writeCount; i++) {
             writes.add(ByteBuffer.wrap(("" + i).getBytes()));
         }
@@ -293,7 +297,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         for (int i = 0; i < writeCount; i++) {
             Future<DLSN> future = futures.get(i);
             try {
-                DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+                Await.result(future, Duration.fromSeconds(10));
                 ++succeeded;
             } catch (Exception ex) {
                 failDueToWrongException(ex);
@@ -510,7 +514,9 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
         }
     }
 
-    /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */
+    /**
+     * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
+     */
     @Test(timeout = 60000)
     public void testCreateStreamTwice() throws Exception {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
index 0e5f187..b776543 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
@@ -17,14 +17,13 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
 import com.twitter.finagle.NoBrokersAvailableException;
 import com.twitter.util.Await;
-import org.junit.Test;
-
 import java.nio.ByteBuffer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
+import org.junit.Test;
 
 /**
  * Test the server with client side routing.

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index f97399d..d9d2b21 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -17,10 +17,17 @@
  */
 package com.twitter.distributedlog.service;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import com.google.common.collect.Lists;
 import com.twitter.distributedlog.DLSN;
 import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.TestDistributedLogBase;
 import com.twitter.distributedlog.acl.DefaultAccessControlManager;
 import com.twitter.distributedlog.client.routing.LocalRoutingService;
@@ -29,11 +36,11 @@ import com.twitter.distributedlog.exceptions.StreamUnavailableException;
 import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
-import com.twitter.distributedlog.service.stream.WriteOp;
-import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
+import com.twitter.distributedlog.service.stream.Stream;
 import com.twitter.distributedlog.service.stream.StreamImpl;
+import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
 import com.twitter.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.distributedlog.service.stream.Stream;
+import com.twitter.distributedlog.service.stream.WriteOp;
 import com.twitter.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
 import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
@@ -43,8 +50,15 @@ import com.twitter.distributedlog.thrift.service.WriteContext;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.ConfUtils;
 import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.util.Await;
 import com.twitter.util.Future;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.SettableFeature;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ReflectionUtils;
@@ -57,22 +71,12 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
 /**
- * Test Case for DistributedLog Service
+ * Test Case for DistributedLog Service.
  */
 public class TestDistributedLogService extends TestDistributedLogBase {
 
-    static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
 
     @Rule
     public TestName testName = new TestName();
@@ -357,12 +361,12 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         Future<Void> closeFuture0 = s.requestClose("close 0");
         assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus() ||
-                StreamStatus.CLOSED == s.getStatus());
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
         Future<Void> closeFuture1 = s.requestClose("close 1");
         assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus() ||
-                StreamStatus.CLOSED == s.getStatus());
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
 
         Await.result(closeFuture0);
         assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
@@ -386,8 +390,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         Future<Void> closeFuture = s.requestClose("close");
         assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
-                StreamStatus.CLOSING == s.getStatus() ||
-                StreamStatus.CLOSED == s.getStatus());
+                StreamStatus.CLOSING == s.getStatus()
+                    || StreamStatus.CLOSED == s.getStatus());
         WriteOp op1 = createWriteOp(service, streamName, 0L);
         s.submit(op1);
         WriteResponse response1 = Await.result(op1.result());
@@ -430,8 +434,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
 
         StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
         // the stream should be set CLOSING
-        while (StreamStatus.CLOSING != s.getStatus() &&
-                StreamStatus.CLOSED != s.getStatus()) {
+        while (StreamStatus.CLOSING != s.getStatus()
+            && StreamStatus.CLOSED != s.getStatus()) {
             TimeUnit.MILLISECONDS.sleep(20);
         }
         assertNotNull("Writer should be initialized", s.getWriter());
@@ -443,9 +447,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
                     futureList.get(i).isDefined());
             WriteResponse response = Await.result(futureList.get(i));
             assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
-                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() ||
-                    StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
-                    StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
         }
 
         while (streamManager.getCachedStreams().containsKey(streamName)) {
@@ -518,7 +522,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
     public void testTruncateOpNoChecksum() throws Exception {
         DistributedLogServiceImpl localService = createConfiguredLocalService();
         WriteContext ctx = new WriteContext();
-        Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx);
+        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
         WriteResponse resp = Await.result(result);
         assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
         localService.shutdown();
@@ -580,7 +584,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             ProtocolUtils.writeOpCRC32("test", buffer.array()));
 
         // Overwrite 1 byte to corrupt data.
-        buffer.put(1, (byte)0xab);
+        buffer.put(1, (byte) 0xab);
         Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
         WriteResponse resp = Await.result(result);
         assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
@@ -607,7 +611,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
     public void testTruncateOpChecksumBadChecksum() throws Exception {
         DistributedLogServiceImpl localService = createConfiguredLocalService();
         WriteContext ctx = new WriteContext().setCrc32(999);
-        Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx);
+        Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
         WriteResponse resp = Await.result(result);
         assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
         localService.shutdown();
@@ -620,7 +624,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             new NullStatsLogger(),
             new IdentityStreamPartitionConverter(),
             new ServerConfiguration(),
-            (byte)0,
+            (byte) 0,
             checksum,
             false,
             disabledFeature,
@@ -651,8 +655,14 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         String streamName = testName.getMethodName();
 
         SettableFeature disabledFeature = new SettableFeature("", 1);
-        WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
-        WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+        WriteOp writeOp0 = getWriteOp(
+            streamName,
+            disabledFeature,
+            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+        WriteOp writeOp1 = getWriteOp(
+            streamName,
+            disabledFeature,
+            ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
 
         writeOp0.preExecute();
         disabledFeature.set(0);
@@ -699,9 +709,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
         for (Future<WriteResponse> future : futureList) {
             WriteResponse response = Await.result(future);
             assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
-                    StatusCode.SUCCESS == response.getHeader().getCode() ||
-                    StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
-                    StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
+                    StatusCode.SUCCESS == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
         }
         assertTrue("There should be no streams in the cache",
                 streamManager.getCachedStreams().isEmpty());
@@ -757,9 +767,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
             WriteResponse response = Await.result(future);
             assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
                     + response.getHeader().getCode(),
-                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() ||
-                    StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
-                    StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+                    StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+                        || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+                        || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
         }
         // acquired streams should all been removed after we close them
         assertTrue("There should be no streams in the acquired cache",

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
index f4c140e..50c915f 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
@@ -20,6 +20,12 @@ package com.twitter.distributedlog.service;
 import com.twitter.distributedlog.DistributedLogConfiguration;
 import com.twitter.distributedlog.feature.DefaultFeatureProvider;
 import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.apache.bookkeeper.feature.SettableFeature;
@@ -28,18 +34,14 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link com.twitter.distributedlog.exceptions.RegionUnavailableException}.
+ */
 public class TestRegionUnavailable extends DistributedLogServerTestCase {
 
+    /**
+     * A feature provider for testing.
+     */
     public static class TestFeatureProvider extends DefaultFeatureProvider {
 
         public TestFeatureProvider(String rootScope,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
index eb65ad4..d8ef302 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
@@ -17,18 +17,19 @@
  */
 package com.twitter.distributedlog.service;
 
+import static org.junit.Assert.assertEquals;
+
 import com.twitter.finagle.Service;
 import com.twitter.finagle.service.ConstantService;
 import com.twitter.util.Await;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.bookkeeper.stats.NullStatsLogger;
-
+import org.apache.bookkeeper.stats.StatsLogger;
 import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StatsFilter}.
+ */
 public class TestStatsFilter {
 
     class RuntimeExService<Req, Rep> extends Service<Req, Rep> {

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
index 9e04089..2ae3a23 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
@@ -17,13 +17,15 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
 
 import java.util.HashMap;
 import java.util.Map;
+import org.junit.Test;
 
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link BalancerUtils}.
+ */
 public class TestBalancerUtils {
 
     @Test(timeout = 60000)

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
index 05eb724..8a24e21 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
@@ -17,6 +17,9 @@
  */
 package com.twitter.distributedlog.service.balancer;
 
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
 import com.google.common.base.Optional;
 import com.google.common.util.concurrent.RateLimiter;
 import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
@@ -25,6 +28,11 @@ import com.twitter.distributedlog.service.DistributedLogClient;
 import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
 import com.twitter.distributedlog.service.DistributedLogServerTestCase;
 import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 import org.apache.commons.lang3.tuple.Pair;
 import org.junit.After;
 import org.junit.Before;
@@ -33,18 +41,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link ClusterBalancer}.
+ */
 public class TestClusterBalancer extends DistributedLogServerTestCase {
 
-    static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
+    private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
 
     private final int numServers = 5;
     private final List<DLServer> cluster;
@@ -108,7 +110,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase {
         for (int i = 0; i < numStreams; i++) {
             String name = namePrefix + (streamId++);
             try {
-                Await.result(((DistributedLogClient) client.dlClient).write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
+                Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
             } catch (Exception e) {
                 logger.error("Error writing stream {} : ", name, e);
                 throw e;
@@ -145,7 +147,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase {
         Optional<RateLimiter> rateLimiter = Optional.absent();
 
         Balancer balancer = new ClusterBalancer(client.dlClientBuilder,
-                Pair.of((DistributedLogClient)client.dlClient, (MonitorServiceClient)client.dlClient));
+                Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient));
         logger.info("Rebalancing from 'unknown' target");
         try {
             balancer.balanceAll("unknown", 10, rateLimiter);



[3/4] incubator-distributedlog git commit: DL-132: Enable check style for distributedlog service module.

Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
index 111a874..0ee7db4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
@@ -21,26 +21,27 @@ import com.google.common.base.Optional;
 import com.twitter.distributedlog.config.DynamicConfigurationFactory;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.File;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Provide per stream configuration to DistributedLog service layer.
  */
 public class ServiceStreamConfigProvider implements StreamConfigProvider {
-    static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+    private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+    private static final String CONFIG_EXTENSION = "conf";
 
     private final File configBaseDir;
     private final File defaultConfigFile;
     private final StreamPartitionConverter partitionConverter;
     private final DynamicConfigurationFactory configFactory;
     private final DynamicDistributedLogConfiguration defaultDynConf;
-    private final static String CONFIG_EXTENSION = "conf";
 
     public ServiceStreamConfigProvider(String configPath,
                                        String defaultConfigPath,
@@ -51,11 +52,13 @@ public class ServiceStreamConfigProvider implements StreamConfigProvider {
                                        throws ConfigurationException {
         this.configBaseDir = new File(configPath);
         if (!configBaseDir.exists()) {
-            throw new ConfigurationException("Stream configuration base directory " + configPath + " does not exist");
+            throw new ConfigurationException("Stream configuration base directory "
+                + configPath + " does not exist");
         }
         this.defaultConfigFile = new File(configPath);
         if (!defaultConfigFile.exists()) {
-            throw new ConfigurationException("Stream configuration default config " + defaultConfigPath + " does not exist");
+            throw new ConfigurationException("Stream configuration default config "
+                + defaultConfigPath + " does not exist");
         }
 
         // Construct reloading default configuration

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
new file mode 100644
index 0000000..bb0026a
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Server Configurations.
+ */
+package com.twitter.distributedlog.service.config;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
new file mode 100644
index 0000000..4fb3673
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Proxy Service.
+ */
+package com.twitter.distributedlog.service;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
index 144e358..fb2d6d2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,18 +20,20 @@ package com.twitter.distributedlog.service.placement;
 import com.twitter.util.Future;
 
 /**
- * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * Equal Load Appraiser.
+ *
+ * <p>Created for those who hold these truths to be self-evident, that all streams are created equal,
  * that they are endowed by their creator with certain unalienable loads, that among these are
  * Uno, Eins, and One.
  */
 public class EqualLoadAppraiser implements LoadAppraiser {
-  @Override
-  public Future<StreamLoad> getStreamLoad(String stream) {
-    return Future.value(new StreamLoad(stream, 1));
-  }
+    @Override
+    public Future<StreamLoad> getStreamLoad(String stream) {
+        return Future.value(new StreamLoad(stream, 1));
+    }
 
-  @Override
-  public Future<Void> refreshCache() {
-    return Future.value(null);
-  }
+    @Override
+    public Future<Void> refreshCache() {
+        return Future.value(null);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 8c8dc23..c25c267 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,12 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -24,174 +30,171 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
-
-import scala.Function1;
-import scala.runtime.BoxedUnit;
-
 import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.Stats;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
 
 /**
- * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * Least Load Placement Policy.
+ *
+ * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
  * evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
  * the load of a server would be. This placement policy then distributes these streams across the
  * servers.
  */
 public class LeastLoadPlacementPolicy extends PlacementPolicy {
-  private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-  private Map<String, String> streamToServer = new HashMap<String, String>();
-
-  public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                                  DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                                  Duration refreshInterval, StatsLogger statsLogger) {
-    super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
-    statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
-      @Override
-      public Number getDefaultValue() {
-        return 0;
-      }
-
-      @Override
-      public Number getSample() {
-        if (serverLoads.size() > 0) {
-          return serverLoads.last().getLoad() - serverLoads.first().getLoad();
-        } else {
-          return getDefaultValue();
-        }
-      }
-    });
-  }
-
-  private synchronized String getStreamOwner(String stream) {
-    return streamToServer.get(stream);
-  }
-
-  @Override
-  public Future<String> placeStream(String stream) {
-    String streamOwner = getStreamOwner(stream);
-    if (null != streamOwner) {
-      return Future.value(streamOwner);
+
+    private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
+
+    private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+    private Map<String, String> streamToServer = new HashMap<String, String>();
+
+    public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                                    DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                                    Duration refreshInterval, StatsLogger statsLogger) {
+        super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+        statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+            @Override
+            public Number getDefaultValue() {
+                return 0;
+            }
+
+            @Override
+            public Number getSample() {
+                if (serverLoads.size() > 0) {
+                    return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+                } else {
+                    return getDefaultValue();
+                }
+            }
+        });
     }
-    Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
-    return streamLoadFuture.map(new Function<StreamLoad, String>() {
-      @Override
-      public String apply(StreamLoad streamLoad) {
-        return placeStreamSynchronized(streamLoad);
-      }
-    });
-  }
-
-  synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
-    ServerLoad serverLoad = serverLoads.pollFirst();
-    serverLoad.addStream(streamLoad);
-    serverLoads.add(serverLoad);
-    return serverLoad.getServer();
-  }
-
-  @Override
-  public void refresh() {
-    logger.info("Refreshing server loads.");
-    Future<Void> refresh = loadAppraiser.refreshCache();
-    final Set<String> servers = getServers();
-    final Set<String> allStreams = getStreams();
-    Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
-      @Override
-      public Future<TreeSet<ServerLoad>> apply(Void v1) {
-        return calculate(servers, allStreams);
-      }
-    });
-    serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-        try {
-          updateServerLoads(serverLoads);
-        } catch (PlacementStateManager.StateManagerSaveException e) {
-          logger.error("The refreshed mapping could not be persisted and will not be used.", e);
-        }
-        return BoxedUnit.UNIT;
-      }
-    });
-  }
-
-  synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
-    this.placementStateManager.saveOwnership(serverLoads);
-    this.streamToServer = serverLoadsToMap(serverLoads);
-    this.serverLoads = serverLoads;
-  }
-
-  @Override
-  synchronized public void load(TreeSet<ServerLoad> serverLoads) {
-    this.serverLoads = serverLoads;
-    this.streamToServer = serverLoadsToMap(serverLoads);
-  }
-
-  public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
-    logger.info("Calculating server loads");
-    final long startTime = System.currentTimeMillis();
-    ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
-
-    for (String stream: streams) {
-      Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
-      futures.add(streamLoad);
+
+    private synchronized String getStreamOwner(String stream) {
+        return streamToServer.get(stream);
     }
 
-    return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
-      @Override
-      public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
-        /* Sort streamLoads so largest streams are placed first for better balance */
-        TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
-        for (StreamLoad streamLoad: streamLoads) {
-          streamQueue.add(streamLoad);
+    @Override
+    public Future<String> placeStream(String stream) {
+        String streamOwner = getStreamOwner(stream);
+        if (null != streamOwner) {
+            return Future.value(streamOwner);
         }
+        Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+        return streamLoadFuture.map(new Function<StreamLoad, String>() {
+            @Override
+            public String apply(StreamLoad streamLoad) {
+                return placeStreamSynchronized(streamLoad);
+            }
+        });
+    }
 
-        TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
-        for (String server: servers) {
-          ServerLoad serverLoad = new ServerLoad(server);
-          if (!streamQueue.isEmpty()) {
-            serverLoad.addStream(streamQueue.pollFirst());
-          }
-          serverLoads.add(serverLoad);
+    private synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
+        ServerLoad serverLoad = serverLoads.pollFirst();
+        serverLoad.addStream(streamLoad);
+        serverLoads.add(serverLoad);
+        return serverLoad.getServer();
+    }
+
+    @Override
+    public void refresh() {
+        logger.info("Refreshing server loads.");
+        Future<Void> refresh = loadAppraiser.refreshCache();
+        final Set<String> servers = getServers();
+        final Set<String> allStreams = getStreams();
+        Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
+            new Function<Void, Future<TreeSet<ServerLoad>>>() {
+            @Override
+            public Future<TreeSet<ServerLoad>> apply(Void v1) {
+                return calculate(servers, allStreams);
+            }
+        });
+        serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                try {
+                    updateServerLoads(serverLoads);
+                } catch (PlacementStateManager.StateManagerSaveException e) {
+                    logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+                }
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads)
+        throws PlacementStateManager.StateManagerSaveException {
+        this.placementStateManager.saveOwnership(serverLoads);
+        this.streamToServer = serverLoadsToMap(serverLoads);
+        this.serverLoads = serverLoads;
+    }
+
+    @Override
+    public synchronized void load(TreeSet<ServerLoad> serverLoads) {
+        this.serverLoads = serverLoads;
+        this.streamToServer = serverLoadsToMap(serverLoads);
+    }
+
+    public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+        logger.info("Calculating server loads");
+        final long startTime = System.currentTimeMillis();
+        ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+        for (String stream : streams) {
+            Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+            futures.add(streamLoad);
         }
 
-        while (!streamQueue.isEmpty()) {
-          ServerLoad serverLoad = serverLoads.pollFirst();
-          serverLoad.addStream(streamQueue.pollFirst());
-          serverLoads.add(serverLoad);
+        return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+            @Override
+            public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+        /* Sort streamLoads so largest streams are placed first for better balance */
+                TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+                for (StreamLoad streamLoad : streamLoads) {
+                    streamQueue.add(streamLoad);
+                }
+
+                TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+                for (String server : servers) {
+                    ServerLoad serverLoad = new ServerLoad(server);
+                    if (!streamQueue.isEmpty()) {
+                        serverLoad.addStream(streamQueue.pollFirst());
+                    }
+                    serverLoads.add(serverLoad);
+                }
+
+                while (!streamQueue.isEmpty()) {
+                    ServerLoad serverLoad = serverLoads.pollFirst();
+                    serverLoad.addStream(streamQueue.pollFirst());
+                    serverLoads.add(serverLoad);
+                }
+                return serverLoads;
+            }
+        }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+                placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+                return BoxedUnit.UNIT;
+            }
+        }).onFailure(new Function<Throwable, BoxedUnit>() {
+            @Override
+            public BoxedUnit apply(Throwable t) {
+                logger.error("Failure calculating loads", t);
+                placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+                return BoxedUnit.UNIT;
+            }
+        });
+    }
+
+    private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+        HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+        for (ServerLoad serverLoad : serverLoads) {
+            for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
+                streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+            }
         }
-        return serverLoads;
-      }
-    }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
-        placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
-        return BoxedUnit.UNIT;
-      }
-    }).onFailure(new Function<Throwable, BoxedUnit>() {
-      @Override
-      public BoxedUnit apply(Throwable t) {
-        logger.error("Failure calculating loads", t);
-        placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
-        return BoxedUnit.UNIT;
-      }
-    });
-  }
-
-  private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
-    HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
-    for (ServerLoad serverLoad: serverLoads) {
-      for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
-        streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
-      }
+        return streamToServer;
     }
-    return streamToServer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
index 784f106..53c568a 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -19,7 +19,21 @@ package com.twitter.distributedlog.service.placement;
 
 import com.twitter.util.Future;
 
+/**
+ * Interface for load appraiser.
+ */
 public interface LoadAppraiser {
-  Future<StreamLoad> getStreamLoad(String stream);
-  Future<Void> refreshCache();
+    /**
+     * Retrieve the stream load for a given {@code stream}.
+     *
+     * @param stream name of the stream
+     * @return the stream load of the stream.
+     */
+    Future<StreamLoad> getStreamLoad(String stream);
+
+    /**
+     * Refesch the cache.
+     * @return
+     */
+    Future<Void> refreshCache();
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
index 2044428..46e8940 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,15 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
@@ -24,125 +33,116 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Set;
 import java.util.TreeSet;
-
-import scala.runtime.BoxedUnit;
-
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
+import scala.runtime.BoxedUnit;
 
 /**
- * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
- * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
+ *
+ * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
  * then distributed these StreamLoads to the available servers in a manner defined by the
  * implementation creating ServerLoad objects. It then saves this assignment via the
  * PlacementStateManager.
  */
 public abstract class PlacementPolicy {
-  protected final LoadAppraiser loadAppraiser;
-  protected final RoutingService routingService;
-  protected final DistributedLogNamespace namespace;
-  protected final PlacementStateManager placementStateManager;
-  private final Duration refreshInterval;
 
-  protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
-  protected final OpStatsLogger placementCalcStats;
-  private Timer placementRefreshTimer;
+    private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
 
-  public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
-                         DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
-                         Duration refreshInterval, StatsLogger statsLogger) {
-    this.loadAppraiser = loadAppraiser;
-    this.routingService = routingService;
-    this.namespace = namespace;
-    this.placementStateManager = placementStateManager;
-    this.refreshInterval = refreshInterval;
-    placementCalcStats = statsLogger.getOpStatsLogger("placement");
-  }
+    protected final LoadAppraiser loadAppraiser;
+    protected final RoutingService routingService;
+    protected final DistributedLogNamespace namespace;
+    protected final PlacementStateManager placementStateManager;
+    private final Duration refreshInterval;
+    protected final OpStatsLogger placementCalcStats;
+    private Timer placementRefreshTimer;
 
-  public Set<String> getServers() {
-    Set<SocketAddress> hosts = routingService.getHosts();
-    Set<String> servers = new HashSet<String>(hosts.size());
-    for (SocketAddress address: hosts) {
-      servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+    public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+                           DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+                           Duration refreshInterval, StatsLogger statsLogger) {
+        this.loadAppraiser = loadAppraiser;
+        this.routingService = routingService;
+        this.namespace = namespace;
+        this.placementStateManager = placementStateManager;
+        this.refreshInterval = refreshInterval;
+        placementCalcStats = statsLogger.getOpStatsLogger("placement");
     }
-    return servers;
-  }
 
-  public Set<String> getStreams() {
-    Set<String> streams = new HashSet<String>();
-    try {
-      Iterator<String> logs = namespace.getLogs();
-      while (logs.hasNext()) {
-        streams.add(logs.next());
-      }
-    } catch (IOException e) {
-      logger.error("Could not get streams for placement policy.", e);
+    public Set<String> getServers() {
+        Set<SocketAddress> hosts = routingService.getHosts();
+        Set<String> servers = new HashSet<String>(hosts.size());
+        for (SocketAddress address : hosts) {
+            servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+        }
+        return servers;
     }
-    return streams;
-  }
-
-  public void start(boolean leader) {
-    logger.info("Starting placement policy");
 
-    TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
-    for (String server: getServers()) {
-      emptyServerLoads.add(new ServerLoad(server));
+    public Set<String> getStreams() {
+        Set<String> streams = new HashSet<String>();
+        try {
+            Iterator<String> logs = namespace.getLogs();
+            while (logs.hasNext()) {
+                streams.add(logs.next());
+            }
+        } catch (IOException e) {
+            logger.error("Could not get streams for placement policy.", e);
+        }
+        return streams;
     }
-    load(emptyServerLoads); //Pre-Load so streams don't NPE
-    if (leader) { //this is the leader shard
-      logger.info("Shard is leader. Scheduling timed refresh.");
-      placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
-      placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
-        @Override
-        public BoxedUnit apply() {
-          refresh();
-          return BoxedUnit.UNIT;
+
+    public void start(boolean leader) {
+        logger.info("Starting placement policy");
+
+        TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+        for (String server : getServers()) {
+            emptyServerLoads.add(new ServerLoad(server));
         }
-      });
-    } else {
-      logger.info("Shard is not leader. Watching for server load changes.");
-      placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
-        @Override
-        public void callback(TreeSet<ServerLoad> serverLoads) {
-          if (!serverLoads.isEmpty()) {
-            load(serverLoads);
-          }
+        load(emptyServerLoads); //Pre-Load so streams don't NPE
+        if (leader) { //this is the leader shard
+            logger.info("Shard is leader. Scheduling timed refresh.");
+            placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+            placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+                @Override
+                public BoxedUnit apply() {
+                    refresh();
+                    return BoxedUnit.UNIT;
+                }
+            });
+        } else {
+            logger.info("Shard is not leader. Watching for server load changes.");
+            placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+                @Override
+                public void callback(TreeSet<ServerLoad> serverLoads) {
+                    if (!serverLoads.isEmpty()) {
+                        load(serverLoads);
+                    }
+                }
+            });
         }
-      });
     }
-  }
 
-  public void close() {
-    if (placementRefreshTimer != null) {
-      placementRefreshTimer.stop();
+    public void close() {
+        if (placementRefreshTimer != null) {
+            placementRefreshTimer.stop();
+        }
     }
-  }
 
-  /**
-   * Places the stream on a server according to the policy and returns a future contianing the
-   * host that owns the stream upon completion
-   */
-  public abstract Future<String> placeStream(String stream);
+    /**
+     * Places the stream on a server according to the policy.
+     *
+     * <p>It returns a future containing the host that owns the stream upon completion
+     */
+    public abstract Future<String> placeStream(String stream);
 
-  /**
-   * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
-   */
-  public abstract void refresh();
+    /**
+     * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
+     */
+    public abstract void refresh();
 
-  /**
-   * Loads the placement mapping into the node from a TreeSet of ServerLoads
-   */
-  public abstract void load(TreeSet<ServerLoad> serverLoads);
+    /**
+     * Loads the placement mapping into the node from a TreeSet of ServerLoads.
+     */
+    public abstract void load(TreeSet<ServerLoad> serverLoads);
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
index cd0d906..17e4685 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,46 +20,60 @@ package com.twitter.distributedlog.service.placement;
 import java.util.TreeSet;
 
 /**
- * The PlacementStateManager handles persistence of calculated resource placements including, the
- * storage once the calculated, and the retrieval by the other shards.
+ * The PlacementStateManager handles persistence of calculated resource placements.
  */
 public interface PlacementStateManager {
 
-  /**
-   * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
-   */
-  void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+    /**
+     * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage.
+     */
+    void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
 
-  /**
-   * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
-   */
-  TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+    /**
+     * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage.
+     */
+    TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
 
-  /**
-   * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
-   * with the new mapping when a change occurs
-   */
-  void watch(PlacementCallback placementCallback);
+    /**
+     * Watch the persistent storage for changes to the ownership mapping.
+     *
+     * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs.
+     */
+    void watch(PlacementCallback placementCallback);
 
-  interface PlacementCallback {
-    void callback(TreeSet<ServerLoad> serverLoads);
-  }
+    /**
+     * Placement Callback.
+     *
+     * <p>The callback is triggered when server loads are updated.
+     */
+    interface PlacementCallback {
+        void callback(TreeSet<ServerLoad> serverLoads);
+    }
 
-  abstract class StateManagerException extends Exception {
-    public StateManagerException(String message, Exception e) {
-      super(message, e);
+    /**
+     * The base exception thrown when state manager encounters errors.
+     */
+    abstract class StateManagerException extends Exception {
+        public StateManagerException(String message, Exception e) {
+            super(message, e);
+        }
     }
-  }
 
-  class StateManagerLoadException extends StateManagerException {
-    public StateManagerLoadException(Exception e) {
-      super("Load of Ownership failed", e);
+    /**
+     * Exception thrown when failed to load the ownership mapping.
+     */
+    class StateManagerLoadException extends StateManagerException {
+        public StateManagerLoadException(Exception e) {
+            super("Load of Ownership failed", e);
+        }
     }
-  }
 
-  class StateManagerSaveException extends StateManagerException {
-    public StateManagerSaveException(Exception e) {
-      super("Save of Ownership failed", e);
+    /**
+     * Exception thrown when failed to save the ownership mapping.
+     */
+    class StateManagerSaveException extends StateManagerException {
+        public StateManagerSaveException(Exception e) {
+            super("Save of Ownership failed", e);
+        }
     }
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
index 801e499..a0b4959 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,139 +17,142 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
-import com.twitter.distributedlog.service.placement.thrift.*;
-
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
- * A comparable data object containing the identifier of the server, total appraised load on the
+ * An object represents the server load.
+ *
+ * <p>A comparable data object containing the identifier of the server, total appraised load on the
  * server, and all streams assigned to the server by the resource placement mapping. This is
  * comparable first by load and then by server so that a sorted data structure of these will be
  * consistent across multiple calculations.
  */
 public class ServerLoad implements Comparable {
-  private static final int BUFFER_SIZE = 4096000;
-  private final String server;
-  private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
-  private long load = 0l;
-
-  public ServerLoad(String server) {
-    this.server = server;
-  }
-
-  synchronized public long addStream(StreamLoad stream) {
-    this.load += stream.getLoad();
-    streamLoads.add(stream);
-    return this.load;
-  }
-
-  synchronized public long removeStream(String stream) {
-    for (StreamLoad streamLoad : streamLoads) {
-      if (streamLoad.stream.equals(stream)) {
-        this.load -= streamLoad.getLoad();
-        streamLoads.remove(streamLoad);
+    private static final int BUFFER_SIZE = 4096000;
+    private final String server;
+    private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+    private long load = 0L;
+
+    public ServerLoad(String server) {
+        this.server = server;
+    }
+
+    public synchronized long addStream(StreamLoad stream) {
+        this.load += stream.getLoad();
+        streamLoads.add(stream);
         return this.load;
-      }
     }
-    return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
-  }
-
-  public synchronized long getLoad() {
-    return load;
-  }
-
-  public synchronized Set<StreamLoad> getStreamLoads() {
-    return streamLoads;
-  }
-
-  public synchronized String getServer() {
-    return server;
-  }
-
-  protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
-    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
-        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
-    tServerLoad.setServer(server);
-    tServerLoad.setLoad(load);
-    ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
-        = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
-    for (StreamLoad streamLoad: streamLoads) {
-      tStreamLoads.add(streamLoad.toThrift());
+
+    public synchronized long removeStream(String stream) {
+        for (StreamLoad streamLoad : streamLoads) {
+            if (streamLoad.stream.equals(stream)) {
+                this.load -= streamLoad.getLoad();
+                streamLoads.remove(streamLoad);
+                return this.load;
+            }
+        }
+        return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+    }
+
+    public synchronized long getLoad() {
+        return load;
+    }
+
+    public synchronized Set<StreamLoad> getStreamLoads() {
+        return streamLoads;
+    }
+
+    public synchronized String getServer() {
+        return server;
+    }
+
+    protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+        com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+            new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+        tServerLoad.setServer(server);
+        tServerLoad.setLoad(load);
+        ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads =
+            new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+        for (StreamLoad streamLoad : streamLoads) {
+            tStreamLoads.add(streamLoad.toThrift());
+        }
+        tServerLoad.setStreams(tStreamLoads);
+        return tServerLoad;
+    }
+
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize server load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize server load : ", uee);
+        }
+    }
+
+    public static ServerLoad deserialize(byte[] data) throws IOException {
+        com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+            new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tServerLoad.read(protocol);
+            ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+            if (tServerLoad.isSetStreams()) {
+                for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
+                    tServerLoad.getStreams()) {
+                    serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+                }
+            }
+            return serverLoad;
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize server load : ", e);
+        }
     }
-    tServerLoad.setStreams(tStreamLoads);
-    return tServerLoad;
-  }
-
-  public byte[] serialize() throws IOException {
-    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      toThrift().write(protocol);
-      transport.flush();
-      return transport.toString(UTF_8.name()).getBytes(UTF_8);
-    } catch (TException e) {
-      throw new IOException("Failed to serialize server load : ", e);
-    } catch (UnsupportedEncodingException uee) {
-      throw new IOException("Failed to serialize server load : ", uee);
+
+    @Override
+    public synchronized int compareTo(Object o) {
+        ServerLoad other = (ServerLoad) o;
+        if (load == other.getLoad()) {
+            return server.compareTo(other.getServer());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
     }
-  }
-
-  public static ServerLoad deserialize(byte[] data) throws IOException {
-    com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
-        = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
-    TMemoryInputTransport transport = new TMemoryInputTransport(data);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      tServerLoad.read(protocol);
-      ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
-      if (tServerLoad.isSetStreams()) {
-        for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
-          serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+
+    @Override
+    public synchronized boolean equals(Object o) {
+        if (!(o instanceof ServerLoad)) {
+            return false;
         }
-      }
-      return serverLoad;
-    } catch (TException e) {
-      throw new IOException("Failed to deserialize server load : ", e);
+        ServerLoad other = (ServerLoad) o;
+        return server.equals(other.getServer())
+            && load == other.getLoad()
+            && streamLoads.equals(other.getStreamLoads());
     }
-  }
-
-  @Override
-  public synchronized int compareTo(Object o) {
-    ServerLoad other = (ServerLoad) o;
-    if (load == other.getLoad()) {
-      return server.compareTo(other.getServer());
-    } else {
-      return Long.compare(load, other.getLoad());
+
+    @Override
+    public synchronized String toString() {
+        return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
     }
-  }
 
-  @Override
-  public synchronized boolean equals(Object o) {
-    if (!(o instanceof ServerLoad)) {
-      return false;
+    @Override
+    public synchronized int hashCode() {
+        return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
     }
-    ServerLoad other = (ServerLoad) o;
-    return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
-  }
-
-  @Override
-  public synchronized String toString() {
-    return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
-  }
-
-  @Override
-  public synchronized int hashCode() {
-    return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
index d7b7efd..c0b0ce1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,96 +17,99 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
-
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.thrift.TException;
 import org.apache.thrift.protocol.TJSONProtocol;
 import org.apache.thrift.transport.TMemoryBuffer;
 import org.apache.thrift.transport.TMemoryInputTransport;
 
-import static com.google.common.base.Charsets.UTF_8;
-
 /**
- * A comparable data object containing the identifier of the stream and the appraised load produced
+ * An object represent the load of a stream.
+ *
+ * <p>A comparable data object containing the identifier of the stream and the appraised load produced
  * by the stream.
  */
 public class StreamLoad implements Comparable {
-  private static final int BUFFER_SIZE = 4096;
-  public final String stream;
-  private final int load;
+    private static final int BUFFER_SIZE = 4096;
+    public final String stream;
+    private final int load;
 
-  public StreamLoad(String stream, int load) {
-    this.stream = stream;
-    this.load = load;
-  }
+    public StreamLoad(String stream, int load) {
+        this.stream = stream;
+        this.load = load;
+    }
 
-  public int getLoad() {
-    return load;
-  }
+    public int getLoad() {
+        return load;
+    }
 
-  public String getStream() {
-    return stream;
-  }
+    public String getStream() {
+        return stream;
+    }
 
-  protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
-    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
-    return tStreamLoad.setStream(stream).setLoad(load);
-  }
+    protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+        com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+            new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+        return tStreamLoad.setStream(stream).setLoad(load);
+    }
 
-  public byte[] serialize() throws IOException {
-    TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      toThrift().write(protocol);
-      transport.flush();
-      return transport.toString(UTF_8.name()).getBytes(UTF_8);
-    } catch (TException e) {
-      throw new IOException("Failed to serialize stream load : ", e);
-    } catch (UnsupportedEncodingException uee) {
-      throw new IOException("Failed to serialize stream load : ", uee);
+    public byte[] serialize() throws IOException {
+        TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            toThrift().write(protocol);
+            transport.flush();
+            return transport.toString(UTF_8.name()).getBytes(UTF_8);
+        } catch (TException e) {
+            throw new IOException("Failed to serialize stream load : ", e);
+        } catch (UnsupportedEncodingException uee) {
+            throw new IOException("Failed to serialize stream load : ", uee);
+        }
     }
-  }
 
-  public static StreamLoad deserialize(byte[] data) throws IOException {
-    com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
-    TMemoryInputTransport transport = new TMemoryInputTransport(data);
-    TJSONProtocol protocol = new TJSONProtocol(transport);
-    try {
-      tStreamLoad.read(protocol);
-      return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
-    } catch (TException e) {
-      throw new IOException("Failed to deserialize stream load : ", e);
+    public static StreamLoad deserialize(byte[] data) throws IOException {
+        com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+            new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+        TMemoryInputTransport transport = new TMemoryInputTransport(data);
+        TJSONProtocol protocol = new TJSONProtocol(transport);
+        try {
+            tStreamLoad.read(protocol);
+            return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+        } catch (TException e) {
+            throw new IOException("Failed to deserialize stream load : ", e);
+        }
     }
-  }
 
-  @Override
-  public int compareTo(Object o) {
-    StreamLoad other = (StreamLoad) o;
-    if (load == other.getLoad()) {
-      return stream.compareTo(other.getStream());
-    } else {
-      return Long.compare(load, other.getLoad());
+    @Override
+    public int compareTo(Object o) {
+        StreamLoad other = (StreamLoad) o;
+        if (load == other.getLoad()) {
+            return stream.compareTo(other.getStream());
+        } else {
+            return Long.compare(load, other.getLoad());
+        }
     }
-  }
 
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof StreamLoad)) {
-      return false;
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof StreamLoad)) {
+            return false;
+        }
+        StreamLoad other = (StreamLoad) o;
+        return stream.equals(other.getStream()) && load == other.getLoad();
     }
-    StreamLoad other = (StreamLoad) o;
-    return stream.equals(other.getStream()) && load == other.getLoad();
-  }
 
-  @Override
-  public String toString() {
-    return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
-  }
+    @Override
+    public String toString() {
+        return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+    }
 
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(stream).append(load).build();
-  }
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder().append(stream).append(load).build();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
index 4f01bdc..977ae04 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
  */
 package com.twitter.distributedlog.service.placement;
 
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.util.Utils;
 import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.HashSet;
 import java.util.List;
 import java.util.TreeSet;
-
 import org.apache.bookkeeper.stats.StatsLogger;
 import org.apache.zookeeper.CreateMode;
 import org.apache.zookeeper.KeeperException;
@@ -35,139 +38,136 @@ import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.twitter.distributedlog.BKDistributedLogNamespace;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.util.Utils;
-
 /**
  * An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
  * avoid necessitating an additional system for the resource placement.
  */
 public class ZKPlacementStateManager implements PlacementStateManager {
-  static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
-  private static final String SERVER_LOAD_DIR = "/.server-load";
 
-  private final String serverLoadPath;
-  private final ZooKeeperClient zkClient;
+    private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
 
-  private boolean watching = false;
+    private static final String SERVER_LOAD_DIR = "/.server-load";
 
-  public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
-    String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
-    zkClient = BKNamespaceDriver.createZKClientBuilder(
-        String.format("ZKPlacementStateManager-%s", zkServers),
-        conf,
-        zkServers,
-        statsLogger.scope("placement_state_manager")).build();
-    serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
-  }
+    private final String serverLoadPath;
+    private final ZooKeeperClient zkClient;
 
-  private void createServerLoadPathIfNoExists(byte[] data)
-        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
-    try {
-      Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
-    } catch (KeeperException.NodeExistsException nee) {
-      logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+    private boolean watching = false;
+
+    public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+        String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+        zkClient = BKNamespaceDriver.createZKClientBuilder(
+            String.format("ZKPlacementStateManager-%s", zkServers),
+            conf,
+            zkServers,
+            statsLogger.scope("placement_state_manager")).build();
+        serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
     }
-  }
-
-  @Override
-  public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
-    logger.info("saving ownership");
-    try {
-      ZooKeeper zk = zkClient.get();
-      // use timestamp as data so watchers will see any changes
-      byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
-      if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
-        createServerLoadPathIfNoExists(timestamp);
-      }
-
-      Transaction tx = zk.transaction();
-      List<String> children = zk.getChildren(serverLoadPath, false);
-      HashSet<String> servers = new HashSet<String>(children);
-      tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
-      for (ServerLoad serverLoad : serverLoads) {
-        String server = serverToZkFormat(serverLoad.getServer());
-        String serverPath = serverPath(server);
-        if (servers.contains(server)) {
-          servers.remove(server);
-          tx.setData(serverPath, serverLoad.serialize(), -1);
-        } else {
-          tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+
+    private void createServerLoadPathIfNoExists(byte[] data)
+        throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+        try {
+            Utils.zkCreateFullPathOptimistic(
+                zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+        } catch (KeeperException.NodeExistsException nee) {
+            logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
         }
-      }
-      for (String server : servers) {
-        tx.delete(serverPath(server), -1);
-      }
-      tx.commit();
-    } catch (InterruptedException | IOException | KeeperException e) {
-      throw new StateManagerSaveException(e);
     }
-  }
-
-  @Override
-  public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
-    TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
-    try {
-      ZooKeeper zk = zkClient.get();
-      List<String> children = zk.getChildren(serverLoadPath, false);
-      for (String server : children) {
-        ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
-      }
-      return ownerships;
-    } catch (InterruptedException | IOException | KeeperException e) {
-      throw new StateManagerLoadException(e);
+
+    @Override
+    public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+        logger.info("saving ownership");
+        try {
+            ZooKeeper zk = zkClient.get();
+            // use timestamp as data so watchers will see any changes
+            byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+            if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+                createServerLoadPathIfNoExists(timestamp);
+            }
+
+            Transaction tx = zk.transaction();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            HashSet<String> servers = new HashSet<String>(children);
+            tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+            for (ServerLoad serverLoad : serverLoads) {
+                String server = serverToZkFormat(serverLoad.getServer());
+                String serverPath = serverPath(server);
+                if (servers.contains(server)) {
+                    servers.remove(server);
+                    tx.setData(serverPath, serverLoad.serialize(), -1);
+                } else {
+                    tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+                }
+            }
+            for (String server : servers) {
+                tx.delete(serverPath(server), -1);
+            }
+            tx.commit();
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerSaveException(e);
+        }
     }
-  }
 
-  @Override
-  synchronized public void watch(final PlacementCallback callback) {
-    if (watching) {
-      return; // do not double watch
+    @Override
+    public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+        TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+        try {
+            ZooKeeper zk = zkClient.get();
+            List<String> children = zk.getChildren(serverLoadPath, false);
+            for (String server : children) {
+                ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+            }
+            return ownerships;
+        } catch (InterruptedException | IOException | KeeperException e) {
+            throw new StateManagerLoadException(e);
+        }
     }
-    watching = true;
-
-    try {
-      ZooKeeper zk = zkClient.get();
-      try {
-        zk.getData(serverLoadPath, new Watcher() {
-          @Override
-          public void process(WatchedEvent watchedEvent) {
+
+    @Override
+    public synchronized void watch(final PlacementCallback callback) {
+        if (watching) {
+            return; // do not double watch
+        }
+        watching = true;
+
+        try {
+            ZooKeeper zk = zkClient.get();
             try {
-              callback.callback(loadOwnership());
-            } catch (StateManagerLoadException e) {
-              logger.error("Watch of Ownership failed", e);
-            } finally {
-              watching = false;
-              watch(callback);
+                zk.getData(serverLoadPath, new Watcher() {
+                    @Override
+                    public void process(WatchedEvent watchedEvent) {
+                        try {
+                            callback.callback(loadOwnership());
+                        } catch (StateManagerLoadException e) {
+                            logger.error("Watch of Ownership failed", e);
+                        } finally {
+                            watching = false;
+                            watch(callback);
+                        }
+                    }
+                }, new Stat());
+            } catch (KeeperException.NoNodeException nee) {
+                byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+                createServerLoadPathIfNoExists(timestamp);
+                watching = false;
+                watch(callback);
             }
-          }
-        }, new Stat());
-      } catch (KeeperException.NoNodeException nee) {
-        byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-        createServerLoadPathIfNoExists(timestamp);
-        watching = false;
-        watch(callback);
-      }
-    } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
-      logger.error("Watch of Ownership failed", e);
-      watching = false;
-      watch(callback);
+        } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+            logger.error("Watch of Ownership failed", e);
+            watching = false;
+            watch(callback);
+        }
     }
-  }
 
-  public String serverPath(String server) {
-    return String.format("%s/%s", serverLoadPath, server);
-  }
+    public String serverPath(String server) {
+        return String.format("%s/%s", serverLoadPath, server);
+    }
 
-  protected String serverToZkFormat(String server) {
-    return server.replaceAll("/", "--");
-  }
+    protected String serverToZkFormat(String server) {
+        return server.replaceAll("/", "--");
+    }
 
-  protected String zkFormatToServer(String zkFormattedServer) {
-    return zkFormattedServer.replaceAll("--", "/");
-  }
+    protected String zkFormatToServer(String zkFormattedServer) {
+        return zkFormattedServer.replaceAll("--", "/");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
new file mode 100644
index 0000000..72c134b
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Placement Policy to place streams across proxy services.
+ */
+package com.twitter.distributedlog.service.placement;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index fbef587..b513e24 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -18,20 +18,18 @@
 package com.twitter.distributedlog.service.stream;
 
 import com.google.common.base.Stopwatch;
-
-import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Try;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.exceptions.ChecksumFailedException;
 import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
-
+import com.twitter.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Try;
 import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -40,8 +38,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
 
+/**
+ * Abstract Stream Operation.
+ */
 public abstract class AbstractStreamOp<Response> implements StreamOp {
-    static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
+    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
     protected final String stream;
     protected final OpStatsLogger opStatsLogger;
     private final Promise<Response> result = new Promise<Response>();
@@ -103,7 +106,7 @@ public abstract class AbstractStreamOp<Response> implements StreamOp {
     }
 
     /**
-     * Fail with current <i>owner</i> and its reason <i>t</i>
+     * Fail with current <i>owner</i> and its reason <i>t</i>.
      *
      * @param cause
      *          failure reason

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
index ae0d67d..a385b84 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
@@ -17,17 +17,18 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.distributedlog.service.ResponseUtils;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
 import com.twitter.util.Future;
-
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.OpStatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Abstract Write Operation.
+ */
 public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
 
     protected AbstractWriteOp(String stream,

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index c009bb9..4d50b66 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -17,19 +17,13 @@
  */
 package com.twitter.distributedlog.service.stream;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.LogRecord;
 import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
 import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.LockingException;
 import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
 import com.twitter.distributedlog.exceptions.RequestDeniedException;
 import com.twitter.distributedlog.service.ResponseUtils;
@@ -39,21 +33,26 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
 import com.twitter.distributedlog.thrift.service.ResponseHeader;
 import com.twitter.distributedlog.thrift.service.StatusCode;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.FutureUtils;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.ConstFuture;
+import com.twitter.util.Future;
 import com.twitter.util.Future$;
 import com.twitter.util.FutureEventListener;
-import com.twitter.util.Future;
 import com.twitter.util.Try;
-
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.OpStatsLogger;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Bulk Write Operation.
+ */
 public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
     private final List<ByteBuffer> buffers;
     private final long payloadSize;
@@ -77,9 +76,9 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
         try {
             result.get();
         } catch (Exception ex) {
-            if (ex instanceof OwnershipAcquireFailedException ||
-                ex instanceof AlreadyClosedException ||
-                ex instanceof LockingException) {
+            if (ex instanceof OwnershipAcquireFailedException
+                || ex instanceof AlreadyClosedException
+                || ex instanceof LockingException) {
                 def = true;
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
index 1dde1f9..e30a989 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to delete a log stream.
+ */
 public class DeleteOp extends AbstractWriteOp {
     private final StreamManager streamManager;
     private final Counter deniedDeleteCounter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
index 4b2cbc1..f34295b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
@@ -17,6 +17,8 @@
  */
 package com.twitter.distributedlog.service.stream;
 
+import static com.google.common.base.Charsets.UTF_8;
+
 import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.BKAsyncLogWriter;
 import com.twitter.distributedlog.DLSN;
@@ -28,15 +30,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
-import static com.google.common.base.Charsets.UTF_8;
-
+/**
+ * Heartbeat Operation.
+ */
 public class HeartbeatOp extends AbstractWriteOp {
 
     static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
index 25835f6..aa0f1a7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
 import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
 import org.apache.bookkeeper.stats.StatsLogger;
-
 import scala.runtime.AbstractFunction1;
 
+/**
+ * Operation to release ownership of a log stream.
+ */
 public class ReleaseOp extends AbstractWriteOp {
     private final StreamManager streamManager;
     private final Counter deniedReleaseCounter;

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
index a1e3e4f..e015e29 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
@@ -23,13 +23,14 @@ import com.twitter.util.Future;
 import java.io.IOException;
 
 /**
- * Stream is the per stream request handler in the DL service layer. The collection of Streams in
- * the proxy are managed by StreamManager.
+ * Stream is the per stream request handler in the DL service layer.
+ *
+ * <p>The collection of Streams in the proxy are managed by StreamManager.
  */
 public interface Stream {
 
     /**
-     * Get the stream configuration for this stream
+     * Get the stream configuration for this stream.
      *
      * @return stream configuration
      */

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
index 51d7ffd..0dfbd69 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.stream;
 
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
 
+/**
+ * Factory to create a stream with provided stream configuration {@code streamConf}.
+ */
 public interface StreamFactory {
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
index cb28f1e..566ded6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
@@ -29,6 +29,9 @@ import com.twitter.util.Timer;
 import org.apache.bookkeeper.feature.FeatureProvider;
 import org.jboss.netty.util.HashedWheelTimer;
 
+/**
+ * The implementation of {@link StreamFactory}.
+ */
 public class StreamFactoryImpl implements StreamFactory {
     private final String clientId;
     private final StreamOpStats streamOpStats;