You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/04 08:44:00 UTC
[1/4] incubator-distributedlog git commit: DL-132: Enable check style
for distributedlog service module.
Repository: incubator-distributedlog
Updated Branches:
refs/heads/master 32a52a9f7 -> 1a30b0ceb
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
index 4036298..d9c2ad1 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestCountBasedStreamChooser.java
@@ -17,18 +17,24 @@
*/
package com.twitter.distributedlog.service.balancer;
-import com.google.common.collect.Sets;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import com.google.common.collect.Sets;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.junit.Test;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link CountBasedStreamChooser}.
+ */
public class TestCountBasedStreamChooser {
@Test(timeout = 60000)
@@ -52,7 +58,7 @@ public class TestCountBasedStreamChooser {
}
CountBasedStreamChooser chooser = new CountBasedStreamChooser(streamDistribution);
- for (int k = 0; k < i+1; k++) {
+ for (int k = 0; k < i + 1; k++) {
assertNull(chooser.choose());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
index 59d1d10..04656bc 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestSimpleBalancer.java
@@ -17,27 +17,29 @@
*/
package com.twitter.distributedlog.service.balancer;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
-import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
import com.twitter.distributedlog.service.DistributedLogServerTestCase;
import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.Set;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.*;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link SimpleBalancer}.
+ */
public class TestSimpleBalancer extends DistributedLogServerTestCase {
- static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestSimpleBalancer.class);
DLClient targetClient;
DLServer targetServer;
@@ -82,7 +84,7 @@ public class TestSimpleBalancer extends DistributedLogServerTestCase {
// write to multiple streams
for (int i = 0; i < numStreams; i++) {
String name = namePrefix + i;
- Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
}
// validation
@@ -139,7 +141,7 @@ public class TestSimpleBalancer extends DistributedLogServerTestCase {
// write to multiple streams
for (int i = 0; i < numStreams; i++) {
String name = namePrefix + i;
- Await.result(((DistributedLogClient) dlClient.dlClient).write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
+ Await.result(dlClient.dlClient.write(name, ByteBuffer.wrap(("" + i).getBytes(UTF_8))));
}
// validation
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
index 3f90f35..d666cf7 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestStreamMover.java
@@ -17,20 +17,22 @@
*/
package com.twitter.distributedlog.service.balancer;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Optional;
import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
import com.twitter.distributedlog.service.DistributedLogServerTestCase;
import com.twitter.util.Await;
+import java.nio.ByteBuffer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.nio.ByteBuffer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StreamMover}.
+ */
public class TestStreamMover extends DistributedLogServerTestCase {
DLClient targetClient;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
index c76dcb6..85ceb95 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestServerConfiguration.java
@@ -17,9 +17,15 @@
*/
package com.twitter.distributedlog.service.config;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import org.junit.Test;
-import static org.junit.Assert.*;
+/**
+ * Test Case for {@link ServerConfiguration}.
+ */
public class TestServerConfiguration {
@Test(timeout = 60000, expected = IllegalArgumentException.class)
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
index e934879..462f4f3 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/config/TestStreamConfigProvider.java
@@ -17,21 +17,25 @@
*/
package com.twitter.distributedlog.service.config;
+import static com.twitter.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.config.PropertiesWriter;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import org.junit.Test;
-
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.junit.Test;
-import static com.twitter.distributedlog.DistributedLogConfiguration.BKDL_RETENTION_PERIOD_IN_HOURS;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StreamConfigProvider}.
+ */
public class TestStreamConfigProvider {
private static final String DEFAULT_CONFIG_DIR = "conf";
private final String defaultConfigPath;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
index bde33c6..a12a64e 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestLeastLoadPlacementPolicy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,20 @@
*/
package com.twitter.distributedlog.service.placement;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Await;
+import com.twitter.util.Duration;
+import com.twitter.util.Future;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -25,139 +39,138 @@ import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicInteger;
-
-import scala.runtime.BoxedUnit;
-
import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Await;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
+/**
+ * Test Case for {@link LeastLoadPlacementPolicy}.
+ */
public class TestLeastLoadPlacementPolicy {
- @Test(timeout = 10000)
- public void testCalculateBalances() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
- RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- new EqualLoadAppraiser(), mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
- TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
- long lowLoadPerServer = numStreams / numSevers;
- long highLoadPerServer = lowLoadPerServer + 1;
- for (ServerLoad serverLoad: serverLoads) {
- long load = serverLoad.getLoad();
- assertEquals(load, serverLoad.getStreamLoads().size());
- assertTrue(String.format("Load %d is not between %d and %d", load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+ @Test(timeout = 10000)
+ public void testCalculateBalances() throws Exception {
+ int numSevers = new Random().nextInt(20) + 1;
+ int numStreams = new Random().nextInt(200) + 1;
+ RoutingService mockRoutingService = mock(RoutingService.class);
+ DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+ new EqualLoadAppraiser(),
+ mockRoutingService,
+ mockNamespace,
+ null,
+ Duration.fromSeconds(600),
+ new NullStatsLogger());
+ TreeSet<ServerLoad> serverLoads =
+ Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+ long lowLoadPerServer = numStreams / numSevers;
+ long highLoadPerServer = lowLoadPerServer + 1;
+ for (ServerLoad serverLoad : serverLoads) {
+ long load = serverLoad.getLoad();
+ assertEquals(load, serverLoad.getStreamLoads().size());
+ assertTrue(String.format("Load %d is not between %d and %d",
+ load, lowLoadPerServer, highLoadPerServer), load == lowLoadPerServer || load == highLoadPerServer);
+ }
}
- }
- @Test(timeout = 10000)
- public void testRefreshAndPlaceStream() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
- RoutingService mockRoutingService = mock(RoutingService.class);
- when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- try {
- when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
- } catch (IOException e) {
- fail();
- }
- PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- new EqualLoadAppraiser(), mockRoutingService, mockNamespace, mockPlacementStateManager, Duration.fromSeconds(600), new NullStatsLogger());
- leastLoadPlacementPolicy.refresh();
+ @Test(timeout = 10000)
+ public void testRefreshAndPlaceStream() throws Exception {
+ int numSevers = new Random().nextInt(20) + 1;
+ int numStreams = new Random().nextInt(200) + 1;
+ RoutingService mockRoutingService = mock(RoutingService.class);
+ when(mockRoutingService.getHosts()).thenReturn(generateSocketAddresses(numSevers));
+ DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ try {
+ when(mockNamespace.getLogs()).thenReturn(generateStreams(numStreams).iterator());
+ } catch (IOException e) {
+ fail();
+ }
+ PlacementStateManager mockPlacementStateManager = mock(PlacementStateManager.class);
+ LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+ new EqualLoadAppraiser(),
+ mockRoutingService,
+ mockNamespace,
+ mockPlacementStateManager,
+ Duration.fromSeconds(600),
+ new NullStatsLogger());
+ leastLoadPlacementPolicy.refresh();
- final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
- verify(mockPlacementStateManager).saveOwnership(captor.capture());
- TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>)captor.getValue();
- ServerLoad next = serverLoads.first();
- String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
- assertEquals(next.getServer(), serverPlacement);
- }
+ final ArgumentCaptor<TreeSet> captor = ArgumentCaptor.forClass(TreeSet.class);
+ verify(mockPlacementStateManager).saveOwnership(captor.capture());
+ TreeSet<ServerLoad> serverLoads = (TreeSet<ServerLoad>) captor.getValue();
+ ServerLoad next = serverLoads.first();
+ String serverPlacement = Await.result(leastLoadPlacementPolicy.placeStream("newstream1"));
+ assertEquals(next.getServer(), serverPlacement);
+ }
- @Test(timeout = 10000)
- public void testCalculateUnequalWeight() throws Exception {
- int numSevers = new Random().nextInt(20) + 1;
- int numStreams = new Random().nextInt(200) + 1;
+ @Test(timeout = 10000)
+ public void testCalculateUnequalWeight() throws Exception {
+ int numSevers = new Random().nextInt(20) + 1;
+ int numStreams = new Random().nextInt(200) + 1;
/* use AtomicInteger to have a final object in answer method */
- final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
- RoutingService mockRoutingService = mock(RoutingService.class);
- DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
- LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
- when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
- @Override
- public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
- int load = new Random().nextInt(100000);
- if (load > maxLoad.get()) {
- maxLoad.set(load);
+ final AtomicInteger maxLoad = new AtomicInteger(Integer.MIN_VALUE);
+ RoutingService mockRoutingService = mock(RoutingService.class);
+ DistributedLogNamespace mockNamespace = mock(DistributedLogNamespace.class);
+ LoadAppraiser mockLoadAppraiser = mock(LoadAppraiser.class);
+ when(mockLoadAppraiser.getStreamLoad(anyString())).then(new Answer<Future<StreamLoad>>() {
+ @Override
+ public Future<StreamLoad> answer(InvocationOnMock invocationOnMock) throws Throwable {
+ int load = new Random().nextInt(100000);
+ if (load > maxLoad.get()) {
+ maxLoad.set(load);
+ }
+ return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
+ }
+ });
+ LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
+ mockLoadAppraiser,
+ mockRoutingService,
+ mockNamespace,
+ null,
+ Duration.fromSeconds(600),
+ new NullStatsLogger());
+ TreeSet<ServerLoad> serverLoads =
+ Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
+ long highestLoadSeen = Long.MIN_VALUE;
+ long lowestLoadSeen = Long.MAX_VALUE;
+ for (ServerLoad serverLoad : serverLoads) {
+ long load = serverLoad.getLoad();
+ if (load < lowestLoadSeen) {
+ lowestLoadSeen = load;
+ }
+ if (load > highestLoadSeen) {
+ highestLoadSeen = load;
+ }
}
- return Future.value(new StreamLoad(invocationOnMock.getArguments()[0].toString(), load));
- }
- });
- LeastLoadPlacementPolicy leastLoadPlacementPolicy = new LeastLoadPlacementPolicy(
- mockLoadAppraiser, mockRoutingService, mockNamespace, null, Duration.fromSeconds(600), new NullStatsLogger());
- TreeSet<ServerLoad> serverLoads = Await.result(leastLoadPlacementPolicy.calculate(generateServers(numSevers), generateStreams(numStreams)));
- long highestLoadSeen = Long.MIN_VALUE;
- long lowestLoadSeen = Long.MAX_VALUE;
- for (ServerLoad serverLoad: serverLoads) {
- long load = serverLoad.getLoad();
- if (load < lowestLoadSeen) {
- lowestLoadSeen = load;
- }
- if (load > highestLoadSeen) {
- highestLoadSeen = load;
- }
- }
- assertTrue("Unexpected placement for " + numStreams + " streams to "
- + numSevers + " servers : highest load = " + highestLoadSeen
- + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
+ assertTrue("Unexpected placement for " + numStreams + " streams to "
+ + numSevers + " servers : highest load = " + highestLoadSeen
+ + ", lowest load = " + lowestLoadSeen + ", max stream load = " + maxLoad.get(),
highestLoadSeen - lowestLoadSeen < maxLoad.get());
- }
+ }
- private Set<SocketAddress> generateSocketAddresses(int num) {
- LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
- for (int i = 0; i < num; i++) {
- socketAddresses.add(new InetSocketAddress(i));
+ private Set<SocketAddress> generateSocketAddresses(int num) {
+ LinkedHashSet<SocketAddress> socketAddresses = new LinkedHashSet<SocketAddress>();
+ for (int i = 0; i < num; i++) {
+ socketAddresses.add(new InetSocketAddress(i));
+ }
+ return socketAddresses;
}
- return socketAddresses;
- }
- private Set<String> generateStreams(int num) {
- LinkedHashSet<String> streams = new LinkedHashSet<String>();
- for (int i = 0; i < num; i++) {
- streams.add("stream_" + i);
+ private Set<String> generateStreams(int num) {
+ LinkedHashSet<String> streams = new LinkedHashSet<String>();
+ for (int i = 0; i < num; i++) {
+ streams.add("stream_" + i);
+ }
+ return streams;
}
- return streams;
- }
- private Set<String> generateServers(int num) {
- LinkedHashSet<String> servers = new LinkedHashSet<String>();
- for (int i = 0; i < num; i++) {
- servers.add("server_" + i);
+ private Set<String> generateServers(int num) {
+ LinkedHashSet<String> servers = new LinkedHashSet<String>();
+ for (int i = 0; i < num; i++) {
+ servers.add("server_" + i);
+ }
+ return servers;
}
- return servers;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
index d844f78..42aeddd 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestServerLoad.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,32 +17,34 @@
*/
package com.twitter.distributedlog.service.placement;
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
+/**
+ * Test Case for {@link ServerLoad}.
+ */
public class TestServerLoad {
- @Test(timeout = 60000)
- public void testSerializeDeserialize() throws IOException {
- final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
- for (int i = 0; i < 20; i++) {
- serverLoad.addStream(new StreamLoad("stream-"+i, i));
+ @Test(timeout = 60000)
+ public void testSerializeDeserialize() throws IOException {
+ final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+ for (int i = 0; i < 20; i++) {
+ serverLoad.addStream(new StreamLoad("stream-" + i, i));
+ }
+ assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
}
- assertEquals(serverLoad, ServerLoad.deserialize(serverLoad.serialize()));
- }
- @Test(timeout = 60000)
- public void testGetLoad() throws IOException {
- final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
- assertEquals(0, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-"+1, 3));
- assertEquals(3, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-"+2, 7));
- assertEquals(10, serverLoad.getLoad());
- serverLoad.addStream(new StreamLoad("stream-"+3, 1));
- assertEquals(11, serverLoad.getLoad());
- }
+ @Test(timeout = 60000)
+ public void testGetLoad() throws IOException {
+ final ServerLoad serverLoad = new ServerLoad("th1s1s@s3rv3rn@m3");
+ assertEquals(0, serverLoad.getLoad());
+ serverLoad.addStream(new StreamLoad("stream-" + 1, 3));
+ assertEquals(3, serverLoad.getLoad());
+ serverLoad.addStream(new StreamLoad("stream-" + 2, 7));
+ assertEquals(10, serverLoad.getLoad());
+ serverLoad.addStream(new StreamLoad("stream-" + 3, 1));
+ assertEquals(11, serverLoad.getLoad());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
index e5091f5..aac30d4 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestStreamLoad.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,19 +17,21 @@
*/
package com.twitter.distributedlog.service.placement;
-import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import java.io.IOException;
import org.junit.Test;
-import static org.junit.Assert.assertEquals;
-
+/**
+ * Test Case for {@link StreamLoad}.
+ */
public class TestStreamLoad {
- @Test(timeout = 10000)
- public void testSerializeDeserialize() throws IOException {
- final String streamName = "aHellaRandomStreamName";
- final int load = 1337;
- final StreamLoad streamLoad = new StreamLoad(streamName, load);
- assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
- }
+ @Test(timeout = 10000)
+ public void testSerializeDeserialize() throws IOException {
+ final String streamName = "aHellaRandomStreamName";
+ final int load = 1337;
+ final StreamLoad streamLoad = new StreamLoad(streamName, load);
+ assertEquals(streamLoad, StreamLoad.deserialize(streamLoad.serialize()));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
index c02492d..1d11219 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/placement/TestZKPlacementStateManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,117 +17,120 @@
*/
package com.twitter.distributedlog.service.placement;
+import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import com.twitter.distributedlog.DistributedLogConfiguration;
import java.io.IOException;
import java.net.URI;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.LinkedBlockingQueue;
-
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.curator.test.TestingServer;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-
-import static com.twitter.distributedlog.LocalDLMEmulator.DLOG_NAMESPACE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
+/**
+ * Test Case for {@link ZKPlacementStateManager}.
+ */
public class TestZKPlacementStateManager {
- private TestingServer zkTestServer;
- private String zkServers;
- private URI uri;
- private ZKPlacementStateManager zkPlacementStateManager;
-
- @Before
- public void startZookeeper() throws Exception {
- zkTestServer = new TestingServer(2181);
- zkServers = "127.0.0.1:2181";
- uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
- zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
- }
-
- @Test(timeout = 60000)
- public void testSaveLoad() throws Exception {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- zkPlacementStateManager.saveOwnership(ownerships);
- SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
-
- ownerships.add(new ServerLoad("emptyServer"));
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
-
- ServerLoad sl1 = new ServerLoad("server1");
- sl1.addStream(new StreamLoad("stream1", 3));
- sl1.addStream(new StreamLoad("stream2", 4));
- ServerLoad sl2 = new ServerLoad("server2");
- sl2.addStream(new StreamLoad("stream3", 1));
- ownerships.add(sl1);
- ownerships.add(sl2);
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
+ private TestingServer zkTestServer;
+ private String zkServers;
+ private URI uri;
+ private ZKPlacementStateManager zkPlacementStateManager;
+
+ @Before
+ public void startZookeeper() throws Exception {
+ zkTestServer = new TestingServer(2181);
+ zkServers = "127.0.0.1:2181";
+ uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+ zkPlacementStateManager =
+ new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+ }
- loadedOwnerships.remove(sl1);
- zkPlacementStateManager.saveOwnership(ownerships);
- loadedOwnerships = zkPlacementStateManager.loadOwnership();
- assertEquals(ownerships, loadedOwnerships);
- }
+ @Test(timeout = 60000)
+ public void testSaveLoad() throws Exception {
+ TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+ zkPlacementStateManager.saveOwnership(ownerships);
+ SortedSet<ServerLoad> loadedOwnerships = zkPlacementStateManager.loadOwnership();
+ assertEquals(ownerships, loadedOwnerships);
+
+ ownerships.add(new ServerLoad("emptyServer"));
+ zkPlacementStateManager.saveOwnership(ownerships);
+ loadedOwnerships = zkPlacementStateManager.loadOwnership();
+ assertEquals(ownerships, loadedOwnerships);
+
+ ServerLoad sl1 = new ServerLoad("server1");
+ sl1.addStream(new StreamLoad("stream1", 3));
+ sl1.addStream(new StreamLoad("stream2", 4));
+ ServerLoad sl2 = new ServerLoad("server2");
+ sl2.addStream(new StreamLoad("stream3", 1));
+ ownerships.add(sl1);
+ ownerships.add(sl2);
+ zkPlacementStateManager.saveOwnership(ownerships);
+ loadedOwnerships = zkPlacementStateManager.loadOwnership();
+ assertEquals(ownerships, loadedOwnerships);
+
+ loadedOwnerships.remove(sl1);
+ zkPlacementStateManager.saveOwnership(ownerships);
+ loadedOwnerships = zkPlacementStateManager.loadOwnership();
+ assertEquals(ownerships, loadedOwnerships);
+ }
- private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
- LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
- int expectedNumServerLoads) throws InterruptedException {
- TreeSet<ServerLoad> notification = notificationQueue.take();
- assertNotNull(notification);
- while (notification.size() < expectedNumServerLoads) {
- notification = notificationQueue.take();
+ private TreeSet<ServerLoad> waitForServerLoadsNotificationAsc(
+ LinkedBlockingQueue<TreeSet<ServerLoad>> notificationQueue,
+ int expectedNumServerLoads) throws InterruptedException {
+ TreeSet<ServerLoad> notification = notificationQueue.take();
+ assertNotNull(notification);
+ while (notification.size() < expectedNumServerLoads) {
+ notification = notificationQueue.take();
+ }
+ assertEquals(expectedNumServerLoads, notification.size());
+ return notification;
}
- assertEquals(expectedNumServerLoads, notification.size());
- return notification;
- }
- @Test(timeout = 60000)
- public void testWatchIndefinitely() throws Exception {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- ownerships.add(new ServerLoad("server1"));
- final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
+ @Test(timeout = 60000)
+ public void testWatchIndefinitely() throws Exception {
+ TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+ ownerships.add(new ServerLoad("server1"));
+ final LinkedBlockingQueue<TreeSet<ServerLoad>> serverLoadNotifications =
new LinkedBlockingQueue<TreeSet<ServerLoad>>();
- PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
- @Override
- public void callback(TreeSet<ServerLoad> serverLoads) {
- serverLoadNotifications.add(serverLoads);
- }
- };
- zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
- zkPlacementStateManager.watch(callback);
- // cannot verify the callback here as it may call before the verify is called
-
- zkPlacementStateManager.saveOwnership(ownerships);
- assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
-
- ServerLoad server2 = new ServerLoad("server2");
- server2.addStream(new StreamLoad("hella-important-stream", 415));
- ownerships.add(server2);
- zkPlacementStateManager.saveOwnership(ownerships);
- assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
- }
+ PlacementStateManager.PlacementCallback callback = new PlacementStateManager.PlacementCallback() {
+ @Override
+ public void callback(TreeSet<ServerLoad> serverLoads) {
+ serverLoadNotifications.add(serverLoads);
+ }
+ };
+ zkPlacementStateManager.saveOwnership(ownerships); // need to initialize the zk path before watching
+ zkPlacementStateManager.watch(callback);
+ // cannot verify the callback here as it may call before the verify is called
+
+ zkPlacementStateManager.saveOwnership(ownerships);
+ assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 1));
+
+ ServerLoad server2 = new ServerLoad("server2");
+ server2.addStream(new StreamLoad("hella-important-stream", 415));
+ ownerships.add(server2);
+ zkPlacementStateManager.saveOwnership(ownerships);
+ assertEquals(ownerships, waitForServerLoadsNotificationAsc(serverLoadNotifications, 2));
+ }
- @Test(timeout = 60000)
- public void testZkFormatting() throws Exception {
- final String server = "host/10.0.0.0:31351";
- final String zkFormattedServer = "host--10.0.0.0:31351";
- URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
- ZKPlacementStateManager zkPlacementStateManager = new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
- assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
- assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
- }
+ @Test(timeout = 60000)
+ public void testZkFormatting() throws Exception {
+ final String server = "host/10.0.0.0:31351";
+ final String zkFormattedServer = "host--10.0.0.0:31351";
+ URI uri = new URI("distributedlog-bk://" + zkServers + DLOG_NAMESPACE + "/bknamespace");
+ ZKPlacementStateManager zkPlacementStateManager =
+ new ZKPlacementStateManager(uri, new DistributedLogConfiguration(), NullStatsLogger.INSTANCE);
+ assertEquals(zkFormattedServer, zkPlacementStateManager.serverToZkFormat(server));
+ assertEquals(server, zkPlacementStateManager.zkFormatToServer(zkFormattedServer));
+ }
- @After
- public void stopZookeeper() throws IOException {
- zkTestServer.stop();
- }
+ @After
+ public void stopZookeeper() throws IOException {
+ zkTestServer.stop();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
index 71e46db..283c290 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamManager.java
@@ -17,8 +17,13 @@
*/
package com.twitter.distributedlog.service.stream;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
@@ -27,13 +32,12 @@ import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.util.Await;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
-
/**
* Test Case for StreamManager.
*/
@@ -53,9 +57,9 @@ public class TestStreamManager {
StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
when(mockStreamFactory.create(
- (String)any(),
+ (String) any(),
(DynamicDistributedLogConfiguration) any(),
- (StreamManager)any())).thenReturn(mockStream);
+ (StreamManager) any())).thenReturn(mockStream);
StreamManager streamManager = new StreamManagerImpl(
"",
new DistributedLogConfiguration(),
@@ -108,7 +112,11 @@ public class TestStreamManager {
StreamFactory mockStreamFactory = mock(StreamFactory.class);
StreamPartitionConverter mockPartitionConverter = mock(StreamPartitionConverter.class);
StreamConfigProvider mockStreamConfigProvider = mock(StreamConfigProvider.class);
- when(mockStreamFactory.create((String)any(), (DynamicDistributedLogConfiguration)any(), (StreamManager)any())).thenReturn(mockStream);
+ when(mockStreamFactory.create(
+ (String) any(),
+ (DynamicDistributedLogConfiguration) any(),
+ (StreamManager) any())
+ ).thenReturn(mockStream);
DistributedLogNamespace dlNamespace = mock(DistributedLogNamespace.class);
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
index 41b4c69..7e52ff2 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/TestStreamOp.java
@@ -17,6 +17,11 @@
*/
package com.twitter.distributedlog.service.stream;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecord;
@@ -30,37 +35,21 @@ import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Await;
import com.twitter.util.Future;
+import java.nio.ByteBuffer;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-import java.util.zip.CRC32;
-
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
/**
* Test Case for StreamOps.
*/
public class TestStreamOp {
- static final Logger logger = LoggerFactory.getLogger(TestStreamOp.class);
-
@Rule
public TestName testName = new TestName();
- private final ThreadLocal<CRC32> requestCRC = new ThreadLocal<CRC32>() {
- @Override
- protected CRC32 initialValue() {
- return new CRC32();
- }
- };
-
private WriteOp getWriteOp() {
SettableFeature disabledFeature = new SettableFeature("", 0);
return new WriteOp("test",
@@ -69,7 +58,7 @@ public class TestStreamOp {
new NullStatsLogger(),
new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
- (byte)0,
+ (byte) 0,
null,
false,
disabledFeature,
@@ -90,7 +79,7 @@ public class TestStreamOp {
@Test(timeout = 60000)
public void testResponseSucceededThenFailed() throws Exception {
AsyncLogWriter writer = mock(AsyncLogWriter.class);
- when(writer.write((LogRecord)any())).thenReturn(Future.value(new DLSN(1,2,3)));
+ when(writer.write((LogRecord) any())).thenReturn(Future.value(new DLSN(1, 2, 3)));
when(writer.getStreamName()).thenReturn("test");
WriteOp writeOp = getWriteOp();
writeOp.execute(writer, new Sequencer() {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
index e2fa53b..a65e51a 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/stream/limiter/TestServiceRequestLimiter.java
@@ -17,31 +17,33 @@
*/
package com.twitter.distributedlog.service.stream.limiter;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.config.ConcurrentConstConfiguration;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.OverCapacityException;
+import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
import com.twitter.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
-import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
import com.twitter.distributedlog.limiter.GuavaRateLimiter;
import com.twitter.distributedlog.limiter.RateLimiter;
import com.twitter.distributedlog.limiter.RequestLimiter;
-
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.junit.Assert.*;
+/**
+ * Test Case for {@link ServiceRequestLimiter}.
+ */
public class TestServiceRequestLimiter {
- static final Logger LOG = LoggerFactory.getLogger(TestServiceRequestLimiter.class);
+ /**
+ * Mock Request.
+ */
class MockRequest {
int size;
MockRequest() {
@@ -55,11 +57,17 @@ public class TestServiceRequestLimiter {
}
}
+ /**
+ * Mock request limiter.
+ */
class MockRequestLimiter implements RequestLimiter<MockRequest> {
public void apply(MockRequest request) {
}
}
+ /**
+ * Counter based limiter.
+ */
static class CounterLimiter implements RateLimiter {
final int limit;
int count;
@@ -78,6 +86,9 @@ public class TestServiceRequestLimiter {
}
}
+ /**
+ * Mock hard request limiter.
+ */
class MockHardRequestLimiter implements RequestLimiter<MockRequest> {
RequestLimiter<MockRequest> limiter;
@@ -114,6 +125,9 @@ public class TestServiceRequestLimiter {
}
}
+ /**
+ * Mock soft request limiter.
+ */
class MockSoftRequestLimiter implements RequestLimiter<MockRequest> {
RequestLimiter<MockRequest> limiter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
index 2275bd6..af7b7e5 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestDelimiterStreamPartitionConverter.java
@@ -17,12 +17,12 @@
*/
package com.twitter.distributedlog.service.streamset;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.*;
+import org.junit.Test;
/**
- * Test Cases for {@link DelimiterStreamPartitionConverter}
+ * Test Cases for {@link DelimiterStreamPartitionConverter}.
*/
public class TestDelimiterStreamPartitionConverter {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
index 327faf2..d0c6660 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestIdentityStreamPartitionConverter.java
@@ -17,10 +17,14 @@
*/
package com.twitter.distributedlog.service.streamset;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.*;
+import org.junit.Test;
+/**
+ * Test Case for {@link IdentityStreamPartitionConverter}.
+ */
public class TestIdentityStreamPartitionConverter {
@Test(timeout = 20000)
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
index e87fdc6..529397e 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/streamset/TestPartitionMap.java
@@ -17,12 +17,13 @@
*/
package com.twitter.distributedlog.service.streamset;
-import org.junit.Test;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.*;
+import org.junit.Test;
/**
- * Test {@link PartitionMap}
+ * Test {@link PartitionMap}.
*/
public class TestPartitionMap {
[4/4] incubator-distributedlog git commit: DL-132: Enable check style
for distributedlog service module.
Posted by si...@apache.org.
DL-132: Enable check style for distributedlog service module.
Author: Xi Liu <xi...@gmail.com>
Reviewers: Sijie Guo <si...@apache.org>
Closes #89 from xiliuant/xi/checkstyle_service
Project: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/commit/1a30b0ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/tree/1a30b0ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/diff/1a30b0ce
Branch: refs/heads/master
Commit: 1a30b0ceb76f33eda08b611d97c150f45f239a95
Parents: 32a52a9
Author: Xi Liu <xi...@gmail.com>
Authored: Wed Jan 4 00:43:56 2017 -0800
Committer: Sijie Guo <si...@apache.org>
Committed: Wed Jan 4 00:43:56 2017 -0800
----------------------------------------------------------------------
.../resources/distributedlog/checkstyle.xml | 2 +-
distributedlog-service/pom.xml | 33 ++
.../distributedlog/service/ClientUtils.java | 3 +
.../service/DistributedLogCluster.java | 78 ++---
.../service/DistributedLogServer.java | 64 ++--
.../service/DistributedLogServerApp.java | 38 ++-
.../service/DistributedLogServiceImpl.java | 48 +--
.../service/FatalErrorHandler.java | 7 -
.../distributedlog/service/MonitorService.java | 29 +-
.../service/MonitorServiceApp.java | 16 +-
.../service/ServerFeatureKeys.java | 2 +-
.../distributedlog/service/StatsFilter.java | 8 +-
.../service/announcer/Announcer.java | 2 +-
.../service/announcer/NOPAnnouncer.java | 3 +
.../service/announcer/ServerSetAnnouncer.java | 10 +-
.../service/announcer/package-info.java | 21 ++
.../service/balancer/Balancer.java | 5 +
.../service/balancer/BalancerTool.java | 42 ++-
.../service/balancer/ClusterBalancer.java | 27 +-
.../balancer/CountBasedStreamChooser.java | 9 +-
.../service/balancer/LimitedStreamChooser.java | 10 +
.../service/balancer/SimpleBalancer.java | 17 +-
.../service/balancer/StreamChooser.java | 2 +-
.../service/balancer/StreamMover.java | 5 +-
.../service/balancer/StreamMoverImpl.java | 2 +-
.../service/balancer/package-info.java | 21 ++
.../config/DefaultStreamConfigProvider.java | 29 +-
.../service/config/ServerConfiguration.java | 106 ++++---
.../config/ServiceStreamConfigProvider.java | 19 +-
.../service/config/package-info.java | 21 ++
.../distributedlog/service/package-info.java | 21 ++
.../service/placement/EqualLoadAppraiser.java | 26 +-
.../placement/LeastLoadPlacementPolicy.java | 311 ++++++++++---------
.../service/placement/LoadAppraiser.java | 18 +-
.../service/placement/PlacementPolicy.java | 194 ++++++------
.../placement/PlacementStateManager.java | 80 +++--
.../service/placement/ServerLoad.java | 227 +++++++-------
.../service/placement/StreamLoad.java | 141 +++++----
.../placement/ZKPlacementStateManager.java | 236 +++++++-------
.../service/placement/package-info.java | 21 ++
.../service/stream/AbstractStreamOp.java | 23 +-
.../service/stream/AbstractWriteOp.java | 9 +-
.../service/stream/BulkWriteOp.java | 29 +-
.../distributedlog/service/stream/DeleteOp.java | 7 +-
.../service/stream/HeartbeatOp.java | 11 +-
.../service/stream/ReleaseOp.java | 7 +-
.../distributedlog/service/stream/Stream.java | 7 +-
.../service/stream/StreamFactory.java | 3 +
.../service/stream/StreamFactoryImpl.java | 3 +
.../service/stream/StreamImpl.java | 34 +-
.../service/stream/StreamManager.java | 4 +-
.../service/stream/StreamManagerImpl.java | 5 +-
.../distributedlog/service/stream/StreamOp.java | 2 -
.../service/stream/StreamOpStats.java | 2 +-
.../service/stream/TruncateOp.java | 11 +-
.../distributedlog/service/stream/WriteOp.java | 17 +-
.../service/stream/WriteOpWithPayload.java | 3 +
.../service/stream/admin/AdminOp.java | 6 +-
.../service/stream/admin/CreateOp.java | 7 +-
.../service/stream/admin/StreamAdminOp.java | 5 +-
.../service/stream/admin/package-info.java | 21 ++
.../stream/limiter/DynamicRequestLimiter.java | 21 +-
.../stream/limiter/RequestLimiterBuilder.java | 24 +-
.../stream/limiter/ServiceRequestLimiter.java | 8 +-
.../stream/limiter/StreamAcquireLimiter.java | 6 +-
.../stream/limiter/StreamRequestLimiter.java | 5 +-
.../service/stream/limiter/package-info.java | 21 ++
.../service/stream/package-info.java | 21 ++
.../CacheableStreamPartitionConverter.java | 3 +
.../DelimiterStreamPartitionConverter.java | 2 +-
.../service/streamset/Partition.java | 1 +
.../service/streamset/PartitionMap.java | 3 +
.../service/streamset/package-info.java | 21 ++
.../distributedlog/service/tools/ProxyTool.java | 30 +-
.../service/tools/package-info.java | 21 ++
.../service/utils/package-info.java | 21 ++
.../stats/CodahaleMetricsServletProvider.java | 4 +-
.../HealthCheckServletContextListener.java | 2 +-
.../stats/MetricsServletContextListener.java | 3 +
.../bookkeeper/stats/ServletReporter.java | 2 +-
.../apache/bookkeeper/stats/package-info.java | 21 ++
.../client/routing/LocalRoutingService.java | 7 +-
.../service/DistributedLogServerTestCase.java | 33 +-
.../service/TestDistributedLogServerBase.java | 62 ++--
.../TestDistributedLogServerClientRouting.java | 9 +-
.../service/TestDistributedLogService.java | 88 +++---
.../service/TestRegionUnavailable.java | 22 +-
.../distributedlog/service/TestStatsFilter.java | 11 +-
.../service/balancer/TestBalancerUtils.java | 8 +-
.../service/balancer/TestClusterBalancer.java | 26 +-
.../balancer/TestCountBasedStreamChooser.java | 16 +-
.../service/balancer/TestSimpleBalancer.java | 22 +-
.../service/balancer/TestStreamMover.java | 12 +-
.../service/config/TestServerConfiguration.java | 8 +-
.../config/TestStreamConfigProvider.java | 14 +-
.../placement/TestLeastLoadPlacementPolicy.java | 247 ++++++++-------
.../service/placement/TestServerLoad.java | 50 +--
.../service/placement/TestStreamLoad.java | 28 +-
.../placement/TestZKPlacementStateManager.java | 197 ++++++------
.../service/stream/TestStreamManager.java | 24 +-
.../service/stream/TestStreamOp.java | 27 +-
.../limiter/TestServiceRequestLimiter.java | 32 +-
.../TestDelimiterStreamPartitionConverter.java | 6 +-
.../TestIdentityStreamPartitionConverter.java | 8 +-
.../service/streamset/TestPartitionMap.java | 7 +-
105 files changed, 1974 insertions(+), 1400 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
----------------------------------------------------------------------
diff --git a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
index e1117c8..db3549f 100644
--- a/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
+++ b/distributedlog-build-tools/src/main/resources/distributedlog/checkstyle.xml
@@ -260,7 +260,7 @@ page at http://checkstyle.sourceforge.net/config.html -->
T, K, V, W, X or else be capital-case terminated with a T,
such as MyGenericParameterT -->
<module name="ClassTypeParameterName">
- <property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
+ <property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*))$"/>
<property name="severity" value="error"/>
</module>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/pom.xml
----------------------------------------------------------------------
diff --git a/distributedlog-service/pom.xml b/distributedlog-service/pom.xml
index 052ce15..154dedb 100644
--- a/distributedlog-service/pom.xml
+++ b/distributedlog-service/pom.xml
@@ -197,6 +197,39 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <dependencies>
+ <dependency>
+ <groupId>com.puppycrawl.tools</groupId>
+ <artifactId>checkstyle</artifactId>
+ <version>6.19</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>distributedlog-build-tools</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+ <configuration>
+ <configLocation>distributedlog/checkstyle.xml</configLocation>
+ <suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
+ <consoleOutput>true</consoleOutput>
+ <failOnViolation>true</failOnViolation>
+ <includeResources>false</includeResources>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>test-compile</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
<profiles>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
index dd9961d..da36014 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ClientUtils.java
@@ -21,6 +21,9 @@ import com.twitter.distributedlog.client.DistributedLogClientImpl;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import org.apache.commons.lang3.tuple.Pair;
+/**
+ * DistributedLog Client Related Utils.
+ */
public class ClientUtils {
public static Pair<DistributedLogClient, MonitorServiceClient> buildClient(DistributedLogClientBuilder builder) {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
index a2a0ca6..029c822 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogCluster.java
@@ -25,6 +25,13 @@ import com.twitter.distributedlog.metadata.DLMetadata;
import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.finagle.builder.Server;
+import java.io.File;
+import java.net.BindException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim;
import org.apache.bookkeeper.stats.NullStatsProvider;
@@ -35,14 +42,6 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.net.BindException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
/**
* DistributedLog Cluster is an emulator to run distributedlog components.
*/
@@ -59,18 +58,18 @@ public class DistributedLogCluster {
*/
public static class Builder {
- int _numBookies = 3;
- boolean _shouldStartZK = true;
- String _zkHost = "127.0.0.1";
- int _zkPort = 0;
- boolean _shouldStartProxy = true;
- int _proxyPort = 7000;
- boolean _thriftmux = false;
- DistributedLogConfiguration _dlConf = new DistributedLogConfiguration()
+ int numBookies = 3;
+ boolean shouldStartZK = true;
+ String zkHost = "127.0.0.1";
+ int zkPort = 0;
+ boolean shouldStartProxy = true;
+ int proxyPort = 7000;
+ boolean thriftmux = false;
+ DistributedLogConfiguration dlConf = new DistributedLogConfiguration()
.setLockTimeout(10)
.setOutputBufferSize(0)
.setImmediateFlushEnabled(true);
- ServerConfiguration _bkConf = new ServerConfiguration();
+ ServerConfiguration bkConf = new ServerConfiguration();
private Builder() {}
@@ -80,7 +79,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder numBookies(int numBookies) {
- this._numBookies = numBookies;
+ this.numBookies = numBookies;
return this;
}
@@ -92,7 +91,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder shouldStartZK(boolean startZK) {
- this._shouldStartZK = startZK;
+ this.shouldStartZK = startZK;
return this;
}
@@ -104,7 +103,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder zkServers(String zkServers) {
- this._zkHost = zkServers;
+ this.zkHost = zkServers;
return this;
}
@@ -116,7 +115,7 @@ public class DistributedLogCluster {
* @return builder.
*/
public Builder zkPort(int zkPort) {
- this._zkPort = zkPort;
+ this.zkPort = zkPort;
return this;
}
@@ -128,7 +127,7 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder shouldStartProxy(boolean startProxy) {
- this._shouldStartProxy = startProxy;
+ this.shouldStartProxy = startProxy;
return this;
}
@@ -140,60 +139,63 @@ public class DistributedLogCluster {
* @return builder
*/
public Builder proxyPort(int proxyPort) {
- this._proxyPort = proxyPort;
+ this.proxyPort = proxyPort;
return this;
}
/**
- * DistributedLog Configuration
+ * Set the distributedlog configuration.
*
* @param dlConf
* distributedlog configuration
* @return builder
*/
public Builder dlConf(DistributedLogConfiguration dlConf) {
- this._dlConf = dlConf;
+ this.dlConf = dlConf;
return this;
}
/**
- * Bookkeeper server configuration
+ * Set the Bookkeeper server configuration.
*
* @param bkConf
* bookkeeper server configuration
* @return builder
*/
public Builder bkConf(ServerConfiguration bkConf) {
- this._bkConf = bkConf;
+ this.bkConf = bkConf;
return this;
}
/**
- * Enable thriftmux for the dl server
+ * Enable thriftmux for the dl server.
*
* @param enabled flag to enable thriftmux
* @return builder
*/
public Builder thriftmux(boolean enabled) {
- this._thriftmux = enabled;
+ this.thriftmux = enabled;
return this;
}
public DistributedLogCluster build() throws Exception {
// build the cluster
return new DistributedLogCluster(
- _dlConf,
- _bkConf,
- _numBookies,
- _shouldStartZK,
- _zkHost,
- _zkPort,
- _shouldStartProxy,
- _proxyPort,
- _thriftmux);
+ dlConf,
+ bkConf,
+ numBookies,
+ shouldStartZK,
+ zkHost,
+ zkPort,
+ shouldStartProxy,
+ proxyPort,
+ thriftmux);
}
}
+ /**
+ * Run a distributedlog proxy server.
+ */
public static class DLServer {
static final int MAX_RETRIES = 20;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
index a9ba125..248bcf7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServer.java
@@ -17,32 +17,8 @@
*/
package com.twitter.distributedlog.service;
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import scala.Option;
-import scala.Tuple2;
-
import com.google.common.base.Optional;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.bookkeeper.stats.NullStatsLogger;
-import org.apache.bookkeeper.stats.StatsLogger;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
@@ -72,10 +48,33 @@ import com.twitter.finagle.thrift.ClientIdRequiredFilter;
import com.twitter.finagle.thrift.ThriftServerFramedCodec;
import com.twitter.finagle.transport.Transport;
import com.twitter.util.Duration;
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.NullStatsLogger;
+import org.apache.bookkeeper.stats.StatsLogger;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.bookkeeper.util.ReflectionUtils;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.Tuple2;
+/**
+ * Running the distributedlog proxy server.
+ */
public class DistributedLogServer {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServer.class);
private static final String DEFAULT_LOAD_APPRIASER = EqualLoadAppraiser.class.getCanonicalName();
private DistributedLogServiceImpl dlService = null;
@@ -124,7 +123,8 @@ public class DistributedLogServer {
this.loadAppraiserClassStr = loadAppraiserClass;
}
- public void runServer() throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
+ public void runServer()
+ throws ConfigurationException, IllegalArgumentException, IOException, ClassNotFoundException {
if (!uri.isPresent()) {
throw new IllegalArgumentException("No distributedlog uri provided.");
}
@@ -135,7 +135,8 @@ public class DistributedLogServer {
try {
dlConf.loadConf(new File(configFile).toURI().toURL());
} catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ configFile + ".");
@@ -185,7 +186,8 @@ public class DistributedLogServer {
}
Class loadAppraiserClass = Class.forName(loadAppraiserClassStr.or(DEFAULT_LOAD_APPRIASER));
LoadAppraiser loadAppraiser = (LoadAppraiser) ReflectionUtils.newInstance(loadAppraiserClass);
- logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get() + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
+ logger.info("Supplied load appraiser class is " + loadAppraiserClassStr.get()
+ + " Instantiated " + loadAppraiser.getClass().getCanonicalName());
StreamConfigProvider streamConfProvider =
getStreamConfigProvider(dlConf, converter);
@@ -227,7 +229,8 @@ public class DistributedLogServer {
}
}
- private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf) throws ConfigurationException {
+ private DynamicDistributedLogConfiguration getServiceDynConf(DistributedLogConfiguration dlConf)
+ throws ConfigurationException {
Optional<DynamicDistributedLogConfiguration> dynConf = Optional.absent();
if (conf.isPresent()) {
DynamicConfigurationFactory configFactory = new DynamicConfigurationFactory(
@@ -341,7 +344,8 @@ public class DistributedLogServer {
logger.info("Using thriftmux.");
Tuple2<Transport.Liveness, Stack.Param<Transport.Liveness>> livenessParam = new Transport.Liveness(
Duration.Top(), Duration.Top(), Option.apply((Object) Boolean.valueOf(true))).mk();
- serverBuilder = serverBuilder.stack(ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
+ serverBuilder = serverBuilder.stack(
+ ThriftMuxServer$.MODULE$.configured(livenessParam._1(), livenessParam._2()));
}
logger.info("DistributedLogServer running with the following configuration : \n{}", dlConf.getPropsAsString());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
index 1c3d8d4..55ed84f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServerApp.java
@@ -17,15 +17,26 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
import com.google.common.base.Function;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.client.routing.RoutingUtils;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
+import java.io.File;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -38,21 +49,14 @@ import org.apache.commons.configuration.ConfigurationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
-
+/**
+ * The launcher of the distributedlog proxy server.
+ */
public class DistributedLogServerApp {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServerApp.class);
- private final static String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
+ private static final String USAGE = "DistributedLogServerApp [-u <uri>] [-c <conf>]";
private final String[] args;
private final Options options = new Options();
@@ -104,7 +108,8 @@ public class DistributedLogServerApp {
}
}
- private void runCmd(CommandLine cmdline) throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
+ private void runCmd(CommandLine cmdline)
+ throws IllegalArgumentException, IOException, ConfigurationException, ClassNotFoundException {
final StatsReceiver statsReceiver = NullStatsReceiver.get();
Optional<String> confOptional = getOptionalStringArg(cmdline, "c");
DistributedLogConfiguration dlConf = new DistributedLogConfiguration();
@@ -113,7 +118,8 @@ public class DistributedLogServerApp {
try {
dlConf.loadConf(new File(configFile).toURI().toURL());
} catch (ConfigurationException e) {
- throw new IllegalArgumentException("Failed to load distributedlog configuration from " + configFile + ".");
+ throw new IllegalArgumentException("Failed to load distributedlog configuration from "
+ + configFile + ".");
} catch (MalformedURLException e) {
throw new IllegalArgumentException("Failed to load distributedlog configuration from malformed "
+ configFile + ".");
@@ -130,7 +136,7 @@ public class DistributedLogServerApp {
}).or(new NullStatsProvider());
final Optional<String> uriOption = getOptionalStringArg(cmdline, "u");
- Preconditions.checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
+ checkArgument(uriOption.isPresent(), "No distributedlog uri provided.");
URI dlUri = URI.create(uriOption.get());
DLZkServerSet serverSet = DLZkServerSet.of(dlUri, (int) TimeUnit.SECONDS.toMillis(60));
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 5dee7fd..db1346e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -47,7 +47,6 @@ import com.twitter.distributedlog.service.placement.PlacementPolicy;
import com.twitter.distributedlog.service.placement.ZKPlacementStateManager;
import com.twitter.distributedlog.service.stream.BulkWriteOp;
import com.twitter.distributedlog.service.stream.DeleteOp;
-import com.twitter.distributedlog.service.stream.admin.CreateOp;
import com.twitter.distributedlog.service.stream.HeartbeatOp;
import com.twitter.distributedlog.service.stream.ReleaseOp;
import com.twitter.distributedlog.service.stream.Stream;
@@ -59,8 +58,8 @@ import com.twitter.distributedlog.service.stream.StreamOp;
import com.twitter.distributedlog.service.stream.StreamOpStats;
import com.twitter.distributedlog.service.stream.TruncateOp;
import com.twitter.distributedlog.service.stream.WriteOp;
-import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
@@ -86,7 +85,6 @@ import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.ScheduledThreadPoolTimer;
import com.twitter.util.Timer;
-
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -96,7 +94,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
@@ -105,15 +102,17 @@ import org.apache.bookkeeper.stats.StatsLogger;
import org.jboss.netty.util.HashedWheelTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.runtime.BoxedUnit;
+/**
+ * Implementation of distributedlog thrift service.
+ */
public class DistributedLogServiceImpl implements DistributedLogService.ServiceIface,
FatalErrorHandler {
- static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
+ private static final Logger logger = LoggerFactory.getLogger(DistributedLogServiceImpl.class);
- private final int MOVING_AVERAGE_WINDOW_SECS = 60;
+ private static final int MOVING_AVERAGE_WINDOW_SECS = 60;
private final ServerConfiguration serverConfig;
private final DistributedLogConfiguration dlConfig;
@@ -294,8 +293,8 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
@Override
public Number getSample() {
- return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable() ?
- 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
+ return ServerStatus.DOWN == serverStatus ? -1 : (featureRegionStopAcceptNewStream.isAvailable()
+ ? 3 : (ServerStatus.WRITE_AND_ACCEPT == serverStatus ? 1 : 2));
}
};
this.movingAvgRpsGauge = new Gauge<Number>() {
@@ -364,8 +363,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
streamsStatsLogger.registerGauge("cached", this.streamCachedGauge);
// Setup complete
- logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {}, dlsn version {}.",
- new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
+ logger.info("Running distributedlog server : client id {}, allocator pool {}, perstream stat {},"
+ + " dlsn version {}.",
+ new Object[] { clientId, allocatorPoolName, serverConf.isPerStreamStatEnabled(), dlsnVersion });
}
private void countStatusCode(StatusCode code) {
@@ -440,7 +440,9 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
@Override
- public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
+ public Future<BulkWriteResponse> writeBulkWithContext(final String stream,
+ List<ByteBuffer> data,
+ WriteContext ctx) {
bulkWritePendingStat.inc();
receivedRecordCounter.add(data.size());
BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
@@ -480,8 +482,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
@Override
public Future<WriteResponse> truncate(String stream, String dlsn, WriteContext ctx) {
- TruncateOp op = new TruncateOp(stream, DLSN.deserialize(dlsn), statsLogger, perStreamStatsLogger, getChecksum(ctx),
- featureChecksumDisabled, accessControlManager);
+ TruncateOp op = new TruncateOp(
+ stream,
+ DLSN.deserialize(dlsn),
+ statsLogger,
+ perStreamStatsLogger,
+ getChecksum(ctx),
+ featureChecksumDisabled,
+ accessControlManager);
executeStreamOp(op);
return op.result();
}
@@ -730,14 +738,14 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
}
/**
- * clean up the gauge before we close to help GC
+ * clean up the gauge before we close to help GC.
*/
private void unregisterGauge(){
- this.statsLogger.unregisterGauge("proxy_status",this.proxyStatusGauge);
- this.statsLogger.unregisterGauge("moving_avg_rps",this.movingAvgRpsGauge);
- this.statsLogger.unregisterGauge("moving_avg_bps",this.movingAvgBpsGauge);
- this.statsLogger.unregisterGauge("acquired",this.streamAcquiredGauge);
- this.statsLogger.unregisterGauge("cached",this.streamCachedGauge);
+ this.statsLogger.unregisterGauge("proxy_status", this.proxyStatusGauge);
+ this.statsLogger.unregisterGauge("moving_avg_rps", this.movingAvgRpsGauge);
+ this.statsLogger.unregisterGauge("moving_avg_bps", this.movingAvgBpsGauge);
+ this.statsLogger.unregisterGauge("acquired", this.streamAcquiredGauge);
+ this.statsLogger.unregisterGauge("cached", this.streamCachedGauge);
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
index e0a15e6..d6922b9 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/FatalErrorHandler.java
@@ -17,13 +17,6 @@
*/
package com.twitter.distributedlog.service;
-import com.google.common.base.Optional;
-import com.twitter.util.Future;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* Implement handling for an unrecoverable error.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
index 7edb778..4ff5b87 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorService.java
@@ -17,9 +17,9 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Sets;
import com.google.common.hash.HashFunction;
@@ -41,14 +41,6 @@ import com.twitter.finagle.stats.StatsReceiver;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration;
import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.StatsProvider;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.commons.lang.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
@@ -65,10 +57,19 @@ import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.Gauge;
+import org.apache.bookkeeper.stats.StatsProvider;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Monitor Service.
+ */
public class MonitorService implements NamespaceListener {
- static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
+ private static final Logger logger = LoggerFactory.getLogger(MonitorService.class);
private DistributedLogNamespace dlNamespace = null;
private MonitorServiceClient dlClient = null;
@@ -271,9 +272,9 @@ public class MonitorService implements NamespaceListener {
}
public void runServer() throws IllegalArgumentException, IOException {
- Preconditions.checkArgument(uriArg.isPresent(),
+ checkArgument(uriArg.isPresent(),
"No distributedlog uri provided.");
- Preconditions.checkArgument(serverSetArg.isPresent(),
+ checkArgument(serverSetArg.isPresent(),
"No proxy server set provided.");
if (intervalArg.isPresent()) {
interval = intervalArg.get();
@@ -422,7 +423,7 @@ public class MonitorService implements NamespaceListener {
}
/**
- * Close the server
+ * Close the server.
*/
public void close() {
logger.info("Closing monitor service.");
@@ -459,7 +460,7 @@ public class MonitorService implements NamespaceListener {
}
/**
- * clean up the gauge before we close to help GC
+ * clean up the gauge before we close to help GC.
*/
private void unregisterGauge(){
statsProvider.getStatsLogger("monitor").unregisterGauge("num_streams", numOfStreamsGauge);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
index a51a6a9..b5b4ca8 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/MonitorServiceApp.java
@@ -17,8 +17,13 @@
*/
package com.twitter.distributedlog.service;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalBooleanArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalIntegerArg;
+import static com.twitter.distributedlog.util.CommandLineUtils.getOptionalStringArg;
+
import com.twitter.finagle.stats.NullStatsReceiver;
import com.twitter.finagle.stats.StatsReceiver;
+import java.io.IOException;
import org.apache.bookkeeper.stats.NullStatsProvider;
import org.apache.bookkeeper.stats.StatsProvider;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -30,15 +35,16 @@ import org.apache.commons.cli.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-
-import static com.twitter.distributedlog.util.CommandLineUtils.*;
+/**
+ * The launcher to run monitor service.
+ */
public class MonitorServiceApp {
- static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+ private static final Logger logger = LoggerFactory.getLogger(MonitorServiceApp.class);
+
+ static final String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
- final static String USAGE = "MonitorService [-u <uri>] [-c <conf>] [-s serverset]";
final String[] args;
final Options options = new Options();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
index 798dcf5..d779cd0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/ServerFeatureKeys.java
@@ -18,7 +18,7 @@
package com.twitter.distributedlog.service;
/**
- * List of feature keys used by distributedlog server
+ * List of feature keys used by distributedlog server.
*/
public enum ServerFeatureKeys {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
index 6c570f6..bd0a992 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/StatsFilter.java
@@ -18,17 +18,13 @@
package com.twitter.distributedlog.service;
import com.google.common.base.Stopwatch;
-
import com.twitter.finagle.Service;
import com.twitter.finagle.SimpleFilter;
import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-
-import org.apache.bookkeeper.stats.StatsLogger;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
-
-import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.stats.StatsLogger;
/**
* Track distributedlog server finagle-service stats.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
index 89e0665..cb37088 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/Announcer.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.announcer;
import java.io.IOException;
/**
- * Announce service information
+ * Announce service information.
*/
public interface Announcer {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
index c686408..471f954 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/NOPAnnouncer.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.announcer;
import java.io.IOException;
+/**
+ * A no-op implementation of {@link Announcer}.
+ */
public class NOPAnnouncer implements Announcer {
@Override
public void announce() throws IOException {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
index ada8710..eaf4c26 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/ServerSetAnnouncer.java
@@ -20,9 +20,6 @@ package com.twitter.distributedlog.service.announcer;
import com.twitter.common.zookeeper.Group;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.client.serverset.DLZkServerSet;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
@@ -30,10 +27,15 @@ import java.net.URI;
import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * ServerSet based announcer.
+ */
public class ServerSetAnnouncer implements Announcer {
- static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
+ private static final Logger logger = LoggerFactory.getLogger(ServerSetAnnouncer.class);
final String localAddr;
final InetSocketAddress serviceEndpoint;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
new file mode 100644
index 0000000..bca36df
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/announcer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Announcers to announce servers to server set.
+ */
+package com.twitter.distributedlog.service.announcer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
index 9dd84d0..3ffe54b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/Balancer.java
@@ -20,6 +20,11 @@ package com.twitter.distributedlog.service.balancer;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
+/**
+ * Balancer Interface.
+ *
+ * <p>A balancer is used for balance the streams across the proxy cluster.
+ */
public interface Balancer {
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
index 2e49d92..48430df 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/BalancerTool.java
@@ -17,8 +17,9 @@
*/
package com.twitter.distributedlog.service.balancer;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.google.common.base.Optional;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.common.zookeeper.ServerSet;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
@@ -33,6 +34,8 @@ import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
@@ -40,15 +43,12 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.URI;
-
/**
- * Tool to rebalance cluster
+ * Tool to rebalance cluster.
*/
public class BalancerTool extends Tool {
- static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
+ private static final Logger logger = LoggerFactory.getLogger(BalancerTool.class);
static DistributedLogClientBuilder createDistributedLogClientBuilder(ServerSet serverSet) {
return DistributedLogClientBuilder.newBuilder()
@@ -66,6 +66,9 @@ public class BalancerTool extends Tool {
.failFast(false));
}
+ /**
+ * Base Command to run balancer.
+ */
protected abstract static class BalancerCommand extends OptsCommand {
protected Options options = new Options();
@@ -78,7 +81,8 @@ public class BalancerTool extends Tool {
BalancerCommand(String name, String description) {
super(name, description);
options.addOption("rwm", "rebalance-water-mark", true, "Rebalance water mark per proxy");
- options.addOption("rtp", "rebalance-tolerance-percentage", true, "Rebalance tolerance percentage per proxy");
+ options.addOption("rtp", "rebalance-tolerance-percentage", true,
+ "Rebalance tolerance percentage per proxy");
options.addOption("rc", "rebalance-concurrency", true, "Concurrency to rebalance stream distribution");
options.addOption("r", "rate", true, "Rebalance rate");
}
@@ -105,11 +109,11 @@ public class BalancerTool extends Tool {
if (cmdline.hasOption("r")) {
this.rate = Double.parseDouble(cmdline.getOptionValue("r"));
}
- Preconditions.checkArgument(rebalanceWaterMark >= 0,
+ checkArgument(rebalanceWaterMark >= 0,
"Rebalance Water Mark should be a non-negative number");
- Preconditions.checkArgument(rebalanceTolerancePercentage >= 0.0f,
+ checkArgument(rebalanceTolerancePercentage >= 0.0f,
"Rebalance Tolerance Percentage should be a non-negative number");
- Preconditions.checkArgument(rebalanceConcurrency > 0,
+ checkArgument(rebalanceConcurrency > 0,
"Rebalance Concurrency should be a positive number");
if (null == rate || rate <= 0.0f) {
rateLimiter = Optional.absent();
@@ -133,6 +137,9 @@ public class BalancerTool extends Tool {
protected abstract int executeCommand(CommandLine cmdline) throws Exception;
}
+ /**
+ * Command to balance streams within a cluster.
+ */
protected static class ClusterBalancerCommand extends BalancerCommand {
protected URI uri;
@@ -188,7 +195,11 @@ public class BalancerTool extends Tool {
ClusterBalancer balancer)
throws Exception {
if (null == source) {
- balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
} else {
balanceFromSource(clientBuilder, balancer, source, getRateLimiter());
}
@@ -217,6 +228,9 @@ public class BalancerTool extends Tool {
}
}
+ /**
+ * Command to balance streams between regions.
+ */
protected static class RegionBalancerCommand extends BalancerCommand {
protected URI region1;
@@ -288,7 +302,11 @@ public class BalancerTool extends Tool {
protected int runBalancer(SimpleBalancer balancer) throws Exception {
if (null == source) {
- balancer.balance(rebalanceWaterMark, rebalanceTolerancePercentage, rebalanceConcurrency, getRateLimiter());
+ balancer.balance(
+ rebalanceWaterMark,
+ rebalanceTolerancePercentage,
+ rebalanceConcurrency,
+ getRateLimiter());
} else {
balancer.balanceAll(source, rebalanceConcurrency, getRateLimiter());
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
index 6fef648..3a3dc1f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/ClusterBalancer.java
@@ -28,10 +28,6 @@ import com.twitter.util.Await;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.Serializable;
import java.net.SocketAddress;
import java.util.ArrayList;
@@ -42,13 +38,16 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
- * A balancer balances ownerships with a cluster of targets
+ * A balancer balances ownerships with a cluster of targets.
*/
public class ClusterBalancer implements Balancer {
- static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(ClusterBalancer.class);
/**
* Represent a single host. Ordered by number of streams in desc order.
@@ -205,7 +204,8 @@ public class ClusterBalancer implements Balancer {
}
int moveFromLowWaterMark;
- int moveToHighWaterMark = Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
+ int moveToHighWaterMark =
+ Math.max(1, (int) (averageLoad + averageLoad * rebalanceTolerancePercentage / 100.0f));
if (hostIdxMoveFrom >= 0) {
moveFromLowWaterMark = Math.max(0, rebalanceWaterMark);
@@ -220,7 +220,8 @@ public class ClusterBalancer implements Balancer {
AtomicInteger moveFrom = new AtomicInteger(0);
AtomicInteger moveTo = new AtomicInteger(hosts.size() - 1);
while (moveFrom.get() < moveTo.get()) {
- moveStreams(hosts, moveFrom, moveFromLowWaterMark, moveTo, moveToHighWaterMark, rebalanceRateLimiter);
+ moveStreams(hosts, moveFrom, moveFromLowWaterMark,
+ moveTo, moveToHighWaterMark, rebalanceRateLimiter);
moveFrom.incrementAndGet();
}
}
@@ -244,8 +245,14 @@ public class ClusterBalancer implements Balancer {
}
if (logger.isDebugEnabled()) {
- logger.debug("Moving streams : hosts = {}, from = {}, to = {} : from_low_water_mark = {}, to_high_water_mark = {}",
- new Object[] { hosts, hostIdxMoveFrom.get(), hostIdxMoveTo.get(), moveFromLowWaterMark, moveToHighWaterMark });
+ logger.debug("Moving streams : hosts = {}, from = {}, to = {} :"
+ + " from_low_water_mark = {}, to_high_water_mark = {}",
+ new Object[] {
+ hosts,
+ hostIdxMoveFrom.get(),
+ hostIdxMoveTo.get(),
+ moveFromLowWaterMark,
+ moveToHighWaterMark });
}
Host hostMoveFrom = hosts.get(hostIdxMoveFrom.get());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
index 0a267ea..fab37b3 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/CountBasedStreamChooser.java
@@ -17,8 +17,7 @@
*/
package com.twitter.distributedlog.service.balancer;
-import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.tuple.Pair;
+import static com.google.common.base.Preconditions.checkArgument;
import java.io.Serializable;
import java.net.SocketAddress;
@@ -29,7 +28,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang3.tuple.Pair;
+/**
+ * A stream chooser based on number of streams.
+ */
class CountBasedStreamChooser implements StreamChooser, Serializable,
Comparator<Pair<SocketAddress, LinkedList<String>>> {
@@ -46,7 +49,7 @@ class CountBasedStreamChooser implements StreamChooser, Serializable,
int next;
CountBasedStreamChooser(Map<SocketAddress, Set<String>> streams) {
- Preconditions.checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
+ checkArgument(streams.size() > 0, "Only support no-empty streams distribution");
streamsDistribution = new ArrayList<Pair<SocketAddress, LinkedList<String>>>(streams.size());
for (Map.Entry<SocketAddress, Set<String>> entry : streams.entrySet()) {
LinkedList<String> randomizedStreams = new LinkedList<String>(entry.getValue());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
index d0e294d..069e596 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/LimitedStreamChooser.java
@@ -17,8 +17,18 @@
*/
package com.twitter.distributedlog.service.balancer;
+/**
+ * A stream chooser that can only choose limited number of streams.
+ */
public class LimitedStreamChooser implements StreamChooser {
+ /**
+ * Create a limited stream chooser by {@code limit}.
+ *
+ * @param underlying the underlying stream chooser.
+ * @param limit the limit of number of streams to choose.
+ * @return the limited stream chooser.
+ */
public static LimitedStreamChooser of(StreamChooser underlying, int limit) {
return new LimitedStreamChooser(underlying, limit);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
index 6913e4a..b205d5f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/SimpleBalancer.java
@@ -21,9 +21,6 @@ import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.service.DistributedLogClient;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
@@ -31,13 +28,15 @@ import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A balancer balances ownerships between two targets.
*/
public class SimpleBalancer implements Balancer {
- static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(SimpleBalancer.class);
protected final String target1;
protected final String target2;
@@ -120,8 +119,8 @@ public class SimpleBalancer implements Balancer {
loadDistribution.put(target, targetStreamCount);
// Calculate how many streams to be rebalanced from src region to target region
- int numStreamsToRebalance =
- BalancerUtils.calculateNumStreamsToRebalance(source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
+ int numStreamsToRebalance = BalancerUtils.calculateNumStreamsToRebalance(
+ source, loadDistribution, rebalanceWaterMark, rebalanceTolerancePercentage);
if (numStreamsToRebalance <= 0) {
logger.info("No streams need to be rebalanced from '{}' to '{}'.", source, target);
@@ -130,7 +129,8 @@ public class SimpleBalancer implements Balancer {
StreamChooser streamChooser =
LimitedStreamChooser.of(new CountBasedStreamChooser(srcDistribution), numStreamsToRebalance);
- StreamMover streamMover = new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
+ StreamMover streamMover =
+ new StreamMoverImpl(source, srcClient, srcMonitor, target, targetClient, targetMonitor);
moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
}
@@ -166,7 +166,8 @@ public class SimpleBalancer implements Balancer {
}
StreamChooser streamChooser = new CountBasedStreamChooser(distribution);
- StreamMover streamMover = new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
+ StreamMover streamMover =
+ new StreamMoverImpl(source, sourceClient, sourceMonitor, target, targetClient, targetMonitor);
moveStreams(streamChooser, streamMover, rebalanceConcurrency, rebalanceRateLimiter);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
index e0fcaf1..d92aef0 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamChooser.java
@@ -18,7 +18,7 @@
package com.twitter.distributedlog.service.balancer;
/**
- * Choose a stream to rebalance
+ * Choose a stream to rebalance.
*/
public interface StreamChooser {
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
index 6857560..6e4205b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMover.java
@@ -17,10 +17,13 @@
*/
package com.twitter.distributedlog.service.balancer;
+/**
+ * A stream mover to move streams between proxies.
+ */
public interface StreamMover {
/**
- * Move given stream <i>streamName</i>
+ * Move given stream <i>streamName</i>.
*
* @param streamName
* stream name to move
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
index 75df4ea..fc67fb2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/StreamMoverImpl.java
@@ -48,7 +48,7 @@ public class StreamMoverImpl implements StreamMover {
}
/**
- * Move given stream <i>streamName</i>
+ * Move given stream <i>streamName</i>.
*
* @param streamName
* stream name to move
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
new file mode 100644
index 0000000..4ae8d44
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/balancer/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Balancer to move streams around to balance the traffic.
+ */
+package com.twitter.distributedlog.service.balancer;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
index d612195..b45b798 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/DefaultStreamConfigProvider.java
@@ -25,34 +25,41 @@ import com.twitter.distributedlog.config.ConfigurationSubscription;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.config.FileConfigurationBuilder;
import com.twitter.distributedlog.config.PropertiesConfigurationBuilder;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* For all streams return the same dynamic config based on configFile.
*/
public class DefaultStreamConfigProvider implements StreamConfigProvider {
- static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamConfigProvider.class);
private final Optional<DynamicDistributedLogConfiguration> dynConf;
private final ConfigurationSubscription confSub;
- public DefaultStreamConfigProvider(String configFilePath, ScheduledExecutorService executorService, int reloadPeriod,
- TimeUnit reloadUnit) throws ConfigurationException {
+ public DefaultStreamConfigProvider(String configFilePath,
+ ScheduledExecutorService executorService,
+ int reloadPeriod,
+ TimeUnit reloadUnit)
+ throws ConfigurationException {
try {
File configFile = new File(configFilePath);
- FileConfigurationBuilder properties = new PropertiesConfigurationBuilder(configFile.toURI().toURL());
- ConcurrentConstConfiguration defaultConf = new ConcurrentConstConfiguration(new DistributedLogConfiguration());
- DynamicDistributedLogConfiguration conf = new DynamicDistributedLogConfiguration(defaultConf);
+ FileConfigurationBuilder properties =
+ new PropertiesConfigurationBuilder(configFile.toURI().toURL());
+ ConcurrentConstConfiguration defaultConf =
+ new ConcurrentConstConfiguration(new DistributedLogConfiguration());
+ DynamicDistributedLogConfiguration conf =
+ new DynamicDistributedLogConfiguration(defaultConf);
List<FileConfigurationBuilder> fileConfigBuilders = Lists.newArrayList(properties);
- confSub = new ConfigurationSubscription(conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
+ confSub = new ConfigurationSubscription(
+ conf, fileConfigBuilders, executorService, reloadPeriod, reloadUnit);
this.dynConf = Optional.of(conf);
} catch (MalformedURLException ex) {
throw new ConfigurationException(ex);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
index 5b19f6c..b3b4c4e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServerConfiguration.java
@@ -17,7 +17,8 @@
*/
package com.twitter.distributedlog.service.config;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkArgument;
+
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogConstants;
@@ -29,7 +30,7 @@ import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.SystemConfiguration;
/**
- * Configuration for DistributedLog Server
+ * Configuration for DistributedLog Server.
*/
public class ServerConfiguration extends CompositeConfiguration {
@@ -43,36 +44,36 @@ public class ServerConfiguration extends CompositeConfiguration {
}
// Server DLSN version
- protected final static String SERVER_DLSN_VERSION = "server_dlsn_version";
- protected final static byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
+ protected static final String SERVER_DLSN_VERSION = "server_dlsn_version";
+ protected static final byte SERVER_DLSN_VERSION_DEFAULT = DLSN.VERSION1;
// Server Durable Write Enable/Disable Flag
- protected final static String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
- protected final static boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
+ protected static final String SERVER_DURABLE_WRITE_ENABLED = "server_durable_write_enabled";
+ protected static final boolean SERVER_DURABLE_WRITE_ENABLED_DEFAULT = true;
// Server Region Id
- protected final static String SERVER_REGION_ID = "server_region_id";
- protected final static int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
+ protected static final String SERVER_REGION_ID = "server_region_id";
+ protected static final int SERVER_REGION_ID_DEFAULT = DistributedLogConstants.LOCAL_REGION_ID;
// Server Port
- protected final static String SERVER_PORT = "server_port";
- protected final static int SERVER_PORT_DEFAULT = 0;
+ protected static final String SERVER_PORT = "server_port";
+ protected static final int SERVER_PORT_DEFAULT = 0;
// Server Shard Id
- protected final static String SERVER_SHARD_ID = "server_shard";
- protected final static int SERVER_SHARD_ID_DEFAULT = -1;
+ protected static final String SERVER_SHARD_ID = "server_shard";
+ protected static final int SERVER_SHARD_ID_DEFAULT = -1;
// Server Threads
- protected final static String SERVER_NUM_THREADS = "server_threads";
- protected final static int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
+ protected static final String SERVER_NUM_THREADS = "server_threads";
+ protected static final int SERVER_NUM_THREADS_DEFAULT = Runtime.getRuntime().availableProcessors();
// Server enable per stream stat
- protected final static String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
- protected final static boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
+ protected static final String SERVER_ENABLE_PERSTREAM_STAT = "server_enable_perstream_stat";
+ protected static final boolean SERVER_ENABLE_PERSTREAM_STAT_DEFAULT = true;
// Server graceful shutdown period (in millis)
- protected final static String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
- protected final static long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
+ protected static final String SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS = "server_graceful_shutdown_period_ms";
+ protected static final long SERVER_GRACEFUL_SHUTDOWN_PERIOD_MS_DEFAULT = 0L;
// Server service timeout
public static final String SERVER_SERVICE_TIMEOUT_MS = "server_service_timeout_ms";
@@ -86,17 +87,18 @@ public class ServerConfiguration extends CompositeConfiguration {
// Server stream probation timeout
public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS = "server_stream_probation_timeout_ms";
public static final String SERVER_STREAM_PROBATION_TIMEOUT_MS_OLD = "streamProbationTimeoutMs";
- public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60*1000*5;
+ public static final long SERVER_STREAM_PROBATION_TIMEOUT_MS_DEFAULT = 60 * 1000 * 5;
// Server stream to partition converter
- protected final static String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
+ protected static final String SERVER_STREAM_PARTITION_CONVERTER_CLASS = "stream_partition_converter_class";
// Use hostname as the allocator pool name
- protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME
- = "server_use_hostname_as_allocator_pool_name";
+ protected static final String SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME =
+ "server_use_hostname_as_allocator_pool_name";
protected static final boolean SERVER_USE_HOSTNAME_AS_ALLOCATOR_POOL_NAME_DEFAULT = false;
//Configure refresh interval for calculating resource placement in seconds
- public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S = "server_resource_placement_refresh_interval_sec";
+ public static final String SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_S =
+ "server_resource_placement_refresh_interval_sec";
public static final int SERVER_RESOURCE_PLACEMENT_REFRESH_INTERVAL_DEFAULT = 120;
public ServerConfiguration() {
@@ -105,7 +107,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Load configurations from {@link DistributedLogConfiguration}
+ * Load configurations from {@link DistributedLogConfiguration}.
*
* @param dlConf
* distributedlog configuration
@@ -137,7 +139,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set the flag to enable/disable durable write
+ * Set the flag to enable/disable durable write.
*
* @param enabled
* flag to enable/disable durable write
@@ -149,7 +151,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Is durable write enabled?
+ * Is durable write enabled.
*
* @return true if waiting writes to be durable. otherwise false.
*/
@@ -158,7 +160,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set the region id used to instantiate DistributedLogNamespace
+ * Set the region id used to instantiate DistributedLogNamespace.
*
* @param regionId
* region id
@@ -170,8 +172,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get the region id used to instantiate
- * {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}
+ * Get the region id used to instantiate {@link com.twitter.distributedlog.namespace.DistributedLogNamespace}.
*
* @return region id used to instantiate DistributedLogNamespace
*/
@@ -213,8 +214,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get the shard id of this server. It would be used to instantiate the client id
- * used for DistributedLogNamespace.
+ * Get the shard id of this server.
+ *
+ * <p>It would be used to instantiate the client id used for DistributedLogNamespace.
*
* @return shard id of this server.
*/
@@ -286,7 +288,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get timeout for stream op execution in proxy layer. 0 disables timeout.
+ * Get timeout for stream op execution in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @return timeout for stream operation in proxy layer.
*/
@@ -296,7 +300,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set timeout for stream op execution in proxy layer. 0 disables timeout.
+ * Set timeout for stream op execution in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @param timeoutMs
* timeout for stream operation in proxy layer.
@@ -308,7 +314,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get timeout for closing writer in proxy layer. 0 disables timeout.
+ * Get timeout for closing writer in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @return timeout for closing writer in proxy layer.
*/
@@ -317,7 +325,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Set timeout for closing writer in proxy layer. 0 disables timeout.
+ * Set timeout for closing writer in proxy layer.
+ *
+ * <p>0 disables timeout.
*
* @param timeoutMs
* timeout for closing writer in proxy layer.
@@ -329,8 +339,9 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * After service timeout, how long should stream be kept in cache in probationary state in order
- * to prevent reacquire. In millisec.
+ * How long should stream be kept in cache in probationary state after service timeout.
+ *
+ * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
*
* @return stream probation timeout in ms.
*/
@@ -340,10 +351,12 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * After service timeout, how long should stream be kept in cache in probationary state in order
- * to prevent reacquire. In millisec.
+ * How long should stream be kept in cache in probationary state after service timeout.
+ *
+ * <p>The setting is to prevent reacquire. The unit of this setting is milliseconds.
*
* @param timeoutMs probation timeout in ms.
+ * @return server configuration
*/
public ServerConfiguration setStreamProbationTimeoutMs(long timeoutMs) {
setProperty(SERVER_STREAM_PROBATION_TIMEOUT_MS, timeoutMs);
@@ -357,7 +370,8 @@ public class ServerConfiguration extends CompositeConfiguration {
* stream partition converter class
* @return server configuration
*/
- public ServerConfiguration setStreamPartitionConverterClass(Class<? extends StreamPartitionConverter> converterClass) {
+ public ServerConfiguration setStreamPartitionConverterClass(
+ Class<? extends StreamPartitionConverter> converterClass) {
setProperty(SERVER_STREAM_PARTITION_CONVERTER_CLASS, converterClass.getName());
return this;
}
@@ -391,7 +405,7 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Get if use hostname as the allocator pool name
+ * Get if use hostname as the allocator pool name.
*
* @return true if use hostname as the allocator pool name. otherwise, use
* {@link #getServerShardId()} as the allocator pool name.
@@ -412,15 +426,17 @@ public class ServerConfiguration extends CompositeConfiguration {
}
/**
- * Validate the configuration
+ * Validate the configuration.
+ *
+ * @throws IllegalStateException when there are any invalid settings.
*/
public void validate() {
byte dlsnVersion = getDlsnVersion();
- Preconditions.checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
+ checkArgument(dlsnVersion >= DLSN.VERSION0 && dlsnVersion <= DLSN.VERSION1,
"Unknown dlsn version " + dlsnVersion);
- Preconditions.checkArgument(getServerThreads() > 0,
+ checkArgument(getServerThreads() > 0,
"Invalid number of server threads : " + getServerThreads());
- Preconditions.checkArgument(getServerShardId() >= 0,
+ checkArgument(getServerShardId() >= 0,
"Invalid server shard id : " + getServerShardId());
}
[2/4] incubator-distributedlog git commit: DL-132: Enable check style
for distributedlog service module.
Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
index 36904fd..55c1b48 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamImpl.java
@@ -20,11 +20,11 @@ package com.twitter.distributedlog.service.stream;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.DistributedLogManager;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
@@ -51,6 +51,12 @@ import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.TimeoutException;
import com.twitter.util.Timer;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
@@ -65,20 +71,17 @@ import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+/**
+ * Implementation of {@link Stream}.
+ */
public class StreamImpl implements Stream {
- static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
/**
* The status of the stream.
*
- * The status change of the stream should just go in one direction. If a stream hits
+ * <p>The status change of the stream should just go in one direction. If a stream hits
* any error, the stream should be put in error state. If a stream is in error state,
* it should be removed and not reused anymore.
*/
@@ -405,7 +408,7 @@ public class StreamImpl implements Stream {
// Stream is closed, fail the op immediately
op.fail(new StreamUnavailableException("Stream " + name + " is closed."));
return;
- } if (StreamStatus.INITIALIZED == status) {
+ } else if (StreamStatus.INITIALIZED == status) {
completeOpNow = true;
success = true;
} else if (failFastOnStreamNotReady) {
@@ -551,7 +554,8 @@ public class StreamImpl implements Stream {
Future<Boolean> acquireStream() {
final Stopwatch stopwatch = Stopwatch.createStarted();
final Promise<Boolean> acquirePromise = new Promise<Boolean>();
- manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
+ manager.openAsyncLogWriter().addEventListener(
+ FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() {
@Override
public void onSuccess(AsyncLogWriter w) {
@@ -748,8 +752,8 @@ public class StreamImpl implements Stream {
final boolean abort;
closeLock.writeLock().lock();
try {
- if (StreamStatus.CLOSING == status ||
- StreamStatus.CLOSED == status) {
+ if (StreamStatus.CLOSING == status
+ || StreamStatus.CLOSED == status) {
return closePromise;
}
logger.info("Request to close stream {} : {}", getStreamName(), reason);
@@ -875,7 +879,7 @@ public class StreamImpl implements Stream {
}
/**
- * clean up the gauge to help GC
+ * clean up the gauge to help GC.
*/
private void unregisterGauge(){
streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
index e171e46..7f7d44e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManager.java
@@ -26,9 +26,9 @@ import java.util.Map;
/**
* Manage lifecycle of streams.
*
- * StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
+ * <p>StreamManager is responsible for creating, destroying, and keeping track of Stream objects.
*
- * Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
+ * <p>Stream objects, which are managed by StreamManager and created by StreamFactory, are essentially the
* per stream request handlers, responsible fo dispatching ex. write requests to an underlying AsyncLogWriter,
* managing stream lock, interpreting exceptions, error conditions, and etc.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
index df336fe..8b36d3b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamManagerImpl.java
@@ -55,13 +55,14 @@ import org.slf4j.LoggerFactory;
* StreamManagerImpl is the default implementation responsible for creating, destroying, and keeping track
* of Streams.
*
- * StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
+ * <p>StreamFactory, supplied to StreamManagerImpl in the constructor below, is reposible simply for creating
* a stream object in isolation from the rest of the system. We pass a StreamFactory in instead of simply
* creating StreamImpl's ourselves in order to inject dependencies without bloating the StreamManagerImpl
* constructor.
*/
public class StreamManagerImpl implements StreamManager {
- static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(StreamManagerImpl.class);
private final ConcurrentHashMap<String, Stream> streams =
new ConcurrentHashMap<String, Stream>();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
index de01f9f..a2cbc80 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOp.java
@@ -24,8 +24,6 @@ import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-
/**
* An operation applied to a stream.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
index bfbc88c..c1019c6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamOpStats.java
@@ -17,8 +17,8 @@
*/
package com.twitter.distributedlog.service.stream;
-import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.service.streamset.Partition;
+import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
index f453dc2..b0b4df2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/TruncateOp.java
@@ -19,26 +19,27 @@ package com.twitter.distributedlog.service.stream;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.acl.AccessControlManager;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Operation to truncate a log stream.
+ */
public class TruncateOp extends AbstractWriteOp {
- static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
+ private static final Logger logger = LoggerFactory.getLogger(TruncateOp.class);
private final Counter deniedTruncateCounter;
private final DLSN dlsn;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
index e9f2f4e..69739dc 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOp.java
@@ -20,35 +20,36 @@ package com.twitter.distributedlog.service.stream;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecord;
-import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.FutureEventListener;
import com.twitter.util.Future;
-
+import com.twitter.util.FutureEventListener;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
-
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Operation to write a single record to a log stream.
+ */
public class WriteOp extends AbstractWriteOp implements WriteOpWithPayload {
- static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(WriteOp.class);
private final byte[] payload;
private final boolean isRecordSet;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
index eef4811..6cc9063 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/WriteOpWithPayload.java
@@ -17,6 +17,9 @@
*/
package com.twitter.distributedlog.service.stream;
+/**
+ * A write operation with payload.
+ */
public interface WriteOpWithPayload {
// Return the payload size in bytes
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
index 7ac4986..6d2d2ea 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
@@ -21,9 +21,9 @@ import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.util.Future;
/**
- * An admin operation
+ * Admin operation interface.
*/
-public interface AdminOp<Response> {
+public interface AdminOp<RespT> {
/**
* Invoked before the stream op is executed.
@@ -35,6 +35,6 @@ public interface AdminOp<Response> {
*
* @return the future represents the response of the operation
*/
- Future<Response> execute();
+ Future<RespT> execute();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
index 2e1f490..478201e 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
@@ -17,6 +17,8 @@
*/
package com.twitter.distributedlog.service.stream.admin;
+import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.service.stream.StreamManager;
import com.twitter.distributedlog.thrift.service.WriteResponse;
@@ -25,8 +27,9 @@ import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.StatsLogger;
import scala.runtime.AbstractFunction1;
-import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
-
+/**
+ * Operation to create log stream.
+ */
public class CreateOp extends StreamAdminOp {
public CreateOp(String stream,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
index 37c6e14..4fad542 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -26,13 +26,12 @@ import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.util.Future;
import com.twitter.util.FutureTransformer;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.OpStatsLogger;
-import java.util.concurrent.TimeUnit;
-
/**
- * Stream admin op
+ * Stream admin op.
*/
public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
new file mode 100644
index 0000000..5ec997c
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Admin Operations.
+ */
+package com.twitter.distributedlog.service.stream.admin;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
index be8c457..d684de5 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/DynamicRequestLimiter.java
@@ -20,32 +20,31 @@ package com.twitter.distributedlog.service.stream.limiter;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.limiter.RequestLimiter;
-
import java.io.Closeable;
-
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.configuration.event.ConfigurationEvent;
import org.apache.commons.configuration.event.ConfigurationListener;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Dynamically rebuild a rate limiter when the supplied dynamic config changes. Subclasses
- * implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
+ * Dynamically rebuild a rate limiter when the supplied dynamic config changes.
+ *
+ * <p>Subclasses implement build() to build the limiter. DynamicRequestLimiter must be closed to deregister
* the config listener.
*/
-public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<Request>, Closeable {
- static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
+public abstract class DynamicRequestLimiter<Req> implements RequestLimiter<Req>, Closeable {
+ private static final Logger LOG = LoggerFactory.getLogger(DynamicRequestLimiter.class);
private final ConfigurationListener listener;
private final Feature rateLimitDisabledFeature;
- volatile RequestLimiter<Request> limiter;
+ volatile RequestLimiter<Req> limiter;
final DynamicDistributedLogConfiguration dynConf;
public DynamicRequestLimiter(DynamicDistributedLogConfiguration dynConf,
- StatsLogger statsLogger, Feature rateLimitDisabledFeature) {
+ StatsLogger statsLogger,
+ Feature rateLimitDisabledFeature) {
final StatsLogger limiterStatsLogger = statsLogger.scope("dynamic");
this.dynConf = dynConf;
this.rateLimitDisabledFeature = rateLimitDisabledFeature;
@@ -74,7 +73,7 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R
}
@Override
- public void apply(Request request) throws OverCapacityException {
+ public void apply(Req request) throws OverCapacityException {
if (rateLimitDisabledFeature.isAvailable()) {
return;
}
@@ -91,5 +90,5 @@ public abstract class DynamicRequestLimiter<Request> implements RequestLimiter<R
* Build the underlying limiter. Called when DynamicRequestLimiter detects config has changed.
* This may be called multiple times so the method should be cheap.
*/
- protected abstract RequestLimiter<Request> build();
+ protected abstract RequestLimiter<Req> build();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
index 157b1ec..c1a37bb 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/RequestLimiterBuilder.java
@@ -17,26 +17,32 @@
*/
package com.twitter.distributedlog.service.stream.limiter;
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkNotNull;
+
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.limiter.ComposableRequestLimiter;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import com.twitter.distributedlog.limiter.ComposableRequestLimiter.CostFunction;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import com.twitter.distributedlog.limiter.GuavaRateLimiter;
import com.twitter.distributedlog.limiter.RateLimiter;
import com.twitter.distributedlog.limiter.RequestLimiter;
import com.twitter.distributedlog.service.stream.StreamOp;
import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
-
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
+/**
+ * Request limiter builder.
+ */
public class RequestLimiterBuilder {
private OverlimitFunction<StreamOp> overlimitFunction = NOP_OVERLIMIT_FUNCTION;
private RateLimiter limiter;
private CostFunction<StreamOp> costFunction;
private StatsLogger statsLogger = NullStatsLogger.INSTANCE;
+ /**
+ * Function to calculate the `RPS` (Request per second) cost of a given stream operation.
+ */
public static final CostFunction<StreamOp> RPS_COST_FUNCTION = new CostFunction<StreamOp>() {
@Override
public int apply(StreamOp op) {
@@ -48,6 +54,9 @@ public class RequestLimiterBuilder {
}
};
+ /**
+ * Function to calculate the `BPS` (Bytes per second) cost of a given stream operation.
+ */
public static final CostFunction<StreamOp> BPS_COST_FUNCTION = new CostFunction<StreamOp>() {
@Override
public int apply(StreamOp op) {
@@ -60,6 +69,9 @@ public class RequestLimiterBuilder {
}
};
+ /**
+ * Function to check if a stream operation will cause {@link OverCapacityException}.
+ */
public static final OverlimitFunction<StreamOp> NOP_OVERLIMIT_FUNCTION = new OverlimitFunction<StreamOp>() {
@Override
public void apply(StreamOp op) throws OverCapacityException {
@@ -96,9 +108,9 @@ public class RequestLimiterBuilder {
}
public RequestLimiter<StreamOp> build() {
- Preconditions.checkNotNull(limiter);
- Preconditions.checkNotNull(overlimitFunction);
- Preconditions.checkNotNull(costFunction);
+ checkNotNull(limiter);
+ checkNotNull(overlimitFunction);
+ checkNotNull(costFunction);
return new ComposableRequestLimiter(limiter, overlimitFunction, costFunction, statsLogger);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
index 69a8470..a3e1efb 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/ServiceRequestLimiter.java
@@ -22,9 +22,9 @@ import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import com.twitter.distributedlog.limiter.RequestLimiter;
+import com.twitter.distributedlog.rate.MovingAverageRate;
import com.twitter.distributedlog.service.stream.StreamManager;
import com.twitter.distributedlog.service.stream.StreamOp;
-import com.twitter.distributedlog.rate.MovingAverageRate;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.StatsLogger;
@@ -89,8 +89,10 @@ public class ServiceRequestLimiter extends DynamicRequestLimiter<StreamOp> {
.limit(bpsSoftServiceLimit);
ChainedRequestLimiter.Builder<StreamOp> builder = new ChainedRequestLimiter.Builder<StreamOp>();
- builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
- builder.addLimiter(new StreamAcquireLimiter(streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
+ builder.addLimiter(new StreamAcquireLimiter(
+ streamManager, serviceRps, rpsStreamAcquireLimit, limiterStatLogger.scope("rps_acquire")));
+ builder.addLimiter(new StreamAcquireLimiter(
+ streamManager, serviceBps, bpsStreamAcquireLimit, limiterStatLogger.scope("bps_acquire")));
builder.addLimiter(bpsHardLimiterBuilder.build());
builder.addLimiter(bpsSoftLimiterBuilder.build());
builder.addLimiter(rpsHardLimiterBuilder.build());
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
index a417e81..5015751 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamAcquireLimiter.java
@@ -20,13 +20,15 @@ package com.twitter.distributedlog.service.stream.limiter;
import com.twitter.distributedlog.exceptions.OverCapacityException;
import com.twitter.distributedlog.exceptions.TooManyStreamsException;
import com.twitter.distributedlog.limiter.RequestLimiter;
+import com.twitter.distributedlog.rate.MovingAverageRate;
import com.twitter.distributedlog.service.stream.StreamManager;
import com.twitter.distributedlog.service.stream.StreamOp;
-import com.twitter.distributedlog.rate.MovingAverageRate;
-
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
+/**
+ * A special limiter on limiting acquiring new streams.
+ */
public class StreamAcquireLimiter implements RequestLimiter<StreamOp> {
private final StreamManager streamManager;
private final MovingAverageRate serviceRps;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
index b4836d1..fa601d1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/StreamRequestLimiter.java
@@ -19,13 +19,16 @@ package com.twitter.distributedlog.service.stream.limiter;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.OverCapacityException;
-import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import com.twitter.distributedlog.limiter.ChainedRequestLimiter;
+import com.twitter.distributedlog.limiter.ComposableRequestLimiter.OverlimitFunction;
import com.twitter.distributedlog.limiter.RequestLimiter;
import com.twitter.distributedlog.service.stream.StreamOp;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.StatsLogger;
+/**
+ * A dynamic request limiter on limiting stream operations.
+ */
public class StreamRequestLimiter extends DynamicRequestLimiter<StreamOp> {
private final DynamicDistributedLogConfiguration dynConf;
private final StatsLogger limiterStatLogger;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
new file mode 100644
index 0000000..533c75a
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/limiter/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Request Rate Limiting.
+ */
+package com.twitter.distributedlog.service.stream.limiter;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
new file mode 100644
index 0000000..389acd9
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Stream Related Operations.
+ */
+package com.twitter.distributedlog.service.stream;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
index de35fc2..9afd234 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/CacheableStreamPartitionConverter.java
@@ -20,6 +20,9 @@ package com.twitter.distributedlog.service.streamset;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+/**
+ * A stream-to-partition converter that caches the mapping between stream and partitions.
+ */
public abstract class CacheableStreamPartitionConverter implements StreamPartitionConverter {
private final ConcurrentMap<String, Partition> partitions;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
index 4576560..d69a393 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/DelimiterStreamPartitionConverter.java
@@ -20,7 +20,7 @@ package com.twitter.distributedlog.service.streamset;
import org.apache.commons.lang3.StringUtils;
/**
- * Stream Partition Converter
+ * Stream Partition Converter that converts the stream name into a stream-to-partition mapping by delimiter.
*/
public class DelimiterStreamPartitionConverter extends CacheableStreamPartitionConverter {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
index d199f88..770c631 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/Partition.java
@@ -22,6 +22,7 @@ import com.google.common.base.Objects;
/**
* `Partition` defines the relationship between a `virtual` stream and a
* physical DL stream.
+ *
* <p>A `virtual` stream could be partitioned into multiple partitions
* and each partition is effectively a DL stream.
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
index 9c882a6..1962e5f 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/PartitionMap.java
@@ -22,6 +22,9 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+/**
+ * A mapping between a logical stream and a set of physical partitions.
+ */
public class PartitionMap {
private final Map<String, Set<Partition>> partitionMap;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
new file mode 100644
index 0000000..3888e40
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/streamset/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * StreamSet - A logical set of streams.
+ */
+package com.twitter.distributedlog.service.streamset;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
index b37de10..8ff2f26 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/ProxyTool.java
@@ -30,6 +30,10 @@ import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Await;
import com.twitter.util.Duration;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
@@ -37,18 +41,16 @@ import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.InetSocketAddress;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Tools to interact with proxies.
*/
public class ProxyTool extends Tool {
- static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
+ private static final Logger logger = LoggerFactory.getLogger(ProxyTool.class);
+ /**
+ * Abstract Cluster level command.
+ */
protected abstract static class ClusterCommand extends OptsCommand {
protected Options options = new Options();
@@ -59,8 +61,8 @@ public class ProxyTool extends Tool {
super(name, description);
options.addOption("u", "uri", true, "DistributedLog URI");
options.addOption("r", "prefix", true, "Prefix of stream name. E.g. 'QuantumLeapTest-'.");
- options.addOption("e", "expression", true, "Expression to generate stream suffix. " +
- "Currently we support range '0-9', list '1,2,3' and name '143'");
+ options.addOption("e", "expression", true, "Expression to generate stream suffix. "
+ + "Currently we support range '0-9', list '1,2,3' and name '143'");
}
@Override
@@ -157,6 +159,9 @@ public class ProxyTool extends Tool {
}
}
+ /**
+ * Command to release ownership of a log stream.
+ */
static class ReleaseCommand extends ClusterCommand {
double rate = 100f;
@@ -196,6 +201,9 @@ public class ProxyTool extends Tool {
}
}
+ /**
+ * Command to truncate a log stream.
+ */
static class TruncateCommand extends ClusterCommand {
DLSN dlsn = DLSN.InitialDLSN;
@@ -234,6 +242,9 @@ public class ProxyTool extends Tool {
}
}
+ /**
+ * Abstract command to operate on a single proxy server.
+ */
protected abstract static class ProxyCommand extends OptsCommand {
protected Options options = new Options();
@@ -291,6 +302,9 @@ public class ProxyTool extends Tool {
protected abstract int runCmd(Pair<DistributedLogClient, MonitorServiceClient> client) throws Exception;
}
+ /**
+ * Command to enable/disable accepting new streams.
+ */
static class AcceptNewStreamCommand extends ProxyCommand {
boolean enabled = false;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
new file mode 100644
index 0000000..1e32fd3
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/tools/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Service related tools.
+ */
+package com.twitter.distributedlog.service.tools;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
new file mode 100644
index 0000000..e6dcec6
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/utils/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Utilities used by proxy servers.
+ */
+package com.twitter.distributedlog.service.utils;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
index c08f0f0..8db3e90 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/CodahaleMetricsServletProvider.java
@@ -23,11 +23,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Extend the codahale metrics provider to run servlets
+ * Extend the codahale metrics provider to run servlets.
*/
public class CodahaleMetricsServletProvider extends CodahaleMetricsProvider {
- private final static Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class);
+ private static final Logger logger = LoggerFactory.getLogger(CodahaleMetricsServletProvider.class);
ServletReporter servletReporter = null;
private final HealthCheckRegistry healthCheckRegistry = new HealthCheckRegistry();
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
index 98cd8b3..348787a 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/HealthCheckServletContextListener.java
@@ -21,7 +21,7 @@ import com.codahale.metrics.health.HealthCheckRegistry;
import com.codahale.metrics.servlets.HealthCheckServlet;
/**
- * Health Check Servlet Listener
+ * Health Check Servlet Listener.
*/
public class HealthCheckServletContextListener extends HealthCheckServlet.ContextListener {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
index e401ac0..15279fe 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/MetricsServletContextListener.java
@@ -20,6 +20,9 @@ package org.apache.bookkeeper.stats;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.servlets.MetricsServlet;
+/**
+ * A servlet to report metrics.
+ */
public class MetricsServletContextListener extends MetricsServlet.ContextListener {
private final MetricRegistry metricRegistry;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
index 9cf0610..267f75a 100644
--- a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/ServletReporter.java
@@ -25,7 +25,7 @@ import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
/**
- * Starts a jetty server on a configurable port to export stats
+ * Starts a jetty server on a configurable port to export stats.
*/
public class ServletReporter {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
new file mode 100644
index 0000000..d00b64d
--- /dev/null
+++ b/distributedlog-service/src/main/java/org/apache/bookkeeper/stats/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Extension of {@link org.apache.bookkeeper.stats.CodahaleMetricsProvider}.
+ */
+package org.apache.bookkeeper.stats;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
index 10941ba..922e901 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/client/routing/LocalRoutingService.java
@@ -20,7 +20,6 @@ package com.twitter.distributedlog.client.routing;
import com.google.common.collect.Sets;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.stats.StatsReceiver;
-
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.LinkedHashSet;
@@ -28,12 +27,18 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+/**
+ * A local routing service that used for testing.
+ */
public class LocalRoutingService implements RoutingService {
public static Builder newBuilder() {
return new Builder();
}
+ /**
+ * Builder to build a local routing service for testing.
+ */
public static class Builder implements RoutingService.Builder {
private Builder() {}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
index 4d29f21..f7e81dc 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/DistributedLogServerTestCase.java
@@ -17,39 +17,38 @@
*/
package com.twitter.distributedlog.service;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
+import com.twitter.distributedlog.DLMTestUtil;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.client.DistributedLogClientImpl;
import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
import com.twitter.distributedlog.client.routing.LocalRoutingService;
import com.twitter.distributedlog.client.routing.RegionsRoutingService;
import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
-import com.twitter.distributedlog.service.stream.StreamManagerImpl;
import com.twitter.distributedlog.service.stream.StreamManager;
-import com.twitter.distributedlog.DLMTestUtil;
-import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.distributedlog.service.stream.StreamManagerImpl;
import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
import com.twitter.util.Duration;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.net.URI;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
-import static org.junit.Assert.*;
-
+/**
+ * Base test case for distributedlog servers.
+ */
public abstract class DistributedLogServerTestCase {
- protected static final Logger LOG = LoggerFactory.getLogger(DistributedLogServerTestCase.class);
-
protected static DistributedLogConfiguration conf =
new DistributedLogConfiguration().setLockTimeout(10)
.setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10);
@@ -59,6 +58,9 @@ public abstract class DistributedLogServerTestCase {
protected static DistributedLogCluster dlCluster;
protected static DistributedLogCluster noAdHocCluster;
+ /**
+ * A distributedlog client wrapper for testing.
+ */
protected static class DLClient {
public final LocalRoutingService routingService;
public DistributedLogClientBuilder dlClientBuilder;
@@ -94,6 +96,9 @@ public abstract class DistributedLogServerTestCase {
}
}
+ /**
+ * A distributedlog client wrapper that talks to two regions.
+ */
protected static class TwoRegionDLClient {
public final LocalRoutingService localRoutingService;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
index 218ea06..24d7f07 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerBase.java
@@ -17,22 +17,30 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Charsets.UTF_8;
+import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import com.google.common.base.Optional;
import com.twitter.distributedlog.AsyncLogReader;
import com.twitter.distributedlog.DLMTestUtil;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogManager;
-import com.twitter.distributedlog.TestZooKeeperClientBuilder;
-import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
-import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.LogReader;
import com.twitter.distributedlog.LogRecord;
import com.twitter.distributedlog.LogRecordWithDLSN;
+import com.twitter.distributedlog.TestZooKeeperClientBuilder;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.acl.AccessControlManager;
-import com.twitter.distributedlog.impl.acl.ZKAccessControl;
+import com.twitter.distributedlog.annotations.DistributedLogAnnotations;
import com.twitter.distributedlog.client.routing.LocalRoutingService;
import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.LogNotFoundException;
+import com.twitter.distributedlog.impl.acl.ZKAccessControl;
import com.twitter.distributedlog.impl.metadata.BKDLConfig;
import com.twitter.distributedlog.namespace.DistributedLogNamespace;
import com.twitter.distributedlog.service.stream.StreamManagerImpl;
@@ -49,13 +57,6 @@ import com.twitter.util.Await;
import com.twitter.util.Duration;
import com.twitter.util.Future;
import com.twitter.util.Futures;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -64,17 +65,19 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
-import static com.google.common.base.Charsets.UTF_8;
-import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
+/**
+ * Test Case for {@link DistributedLogServer}.
+ */
public abstract class TestDistributedLogServerBase extends DistributedLogServerTestCase {
- static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogServerBase.class);
@Rule
public TestName testName = new TestName();
@@ -84,7 +87,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
}
/**
- * {@link https://issues.apache.org/jira/browse/DL-27}
+ * {@link https://issues.apache.org/jira/browse/DL-27}.
*/
@DistributedLogAnnotations.FlakyTest
@Ignore
@@ -212,10 +215,11 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
writes.add(null);
try {
- List<Future<DLSN>> futureResult = dlClient.dlClient.writeBulk(name, writes);
+ dlClient.dlClient.writeBulk(name, writes);
fail("should not have succeeded");
} catch (NullPointerException npe) {
- ; // expected
+ // expected
+ logger.info("Expected to catch NullPointException.");
}
}
@@ -246,7 +250,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
for (int i = start; i < finish; i++) {
Future<DLSN> future = futures.get(i);
try {
- DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+ Await.result(future, Duration.fromSeconds(10));
fail("future should have failed!");
} catch (DLException cre) {
++failed;
@@ -259,7 +263,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
void validateFailedAsLogRecordTooLong(Future<DLSN> future) {
try {
- DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+ Await.result(future, Duration.fromSeconds(10));
fail("should have failed");
} catch (DLException dle) {
assertEquals(StatusCode.TOO_LARGE_RECORD, dle.getCode());
@@ -276,7 +280,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
final int writeCount = 100;
- List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount*2 + 1);
+ List<ByteBuffer> writes = new ArrayList<ByteBuffer>(writeCount * 2 + 1);
for (long i = 1; i <= writeCount; i++) {
writes.add(ByteBuffer.wrap(("" + i).getBytes()));
}
@@ -293,7 +297,7 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
for (int i = 0; i < writeCount; i++) {
Future<DLSN> future = futures.get(i);
try {
- DLSN dlsn = Await.result(future, Duration.fromSeconds(10));
+ Await.result(future, Duration.fromSeconds(10));
++succeeded;
} catch (Exception ex) {
failDueToWrongException(ex);
@@ -510,7 +514,9 @@ public abstract class TestDistributedLogServerBase extends DistributedLogServerT
}
}
- /** This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing */
+ /**
+ * This tests that create has touch like behavior in that trying to create the stream twice, simply does nothing.
+ */
@Test(timeout = 60000)
public void testCreateStreamTwice() throws Exception {
try {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
index 0e5f187..b776543 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogServerClientRouting.java
@@ -17,14 +17,13 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.util.Await;
-import org.junit.Test;
-
import java.nio.ByteBuffer;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.fail;
+import org.junit.Test;
/**
* Test the server with client side routing.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
index f97399d..d9d2b21 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java
@@ -17,10 +17,17 @@
*/
package com.twitter.distributedlog.service;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import com.google.common.collect.Lists;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.TestDistributedLogBase;
import com.twitter.distributedlog.acl.DefaultAccessControlManager;
import com.twitter.distributedlog.client.routing.LocalRoutingService;
@@ -29,11 +36,11 @@ import com.twitter.distributedlog.exceptions.StreamUnavailableException;
import com.twitter.distributedlog.service.config.NullStreamConfigProvider;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.placement.EqualLoadAppraiser;
-import com.twitter.distributedlog.service.stream.WriteOp;
-import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
+import com.twitter.distributedlog.service.stream.Stream;
import com.twitter.distributedlog.service.stream.StreamImpl;
+import com.twitter.distributedlog.service.stream.StreamImpl.StreamStatus;
import com.twitter.distributedlog.service.stream.StreamManagerImpl;
-import com.twitter.distributedlog.service.stream.Stream;
+import com.twitter.distributedlog.service.stream.WriteOp;
import com.twitter.distributedlog.service.streamset.DelimiterStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
@@ -43,8 +50,15 @@ import com.twitter.distributedlog.thrift.service.WriteContext;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.ConfUtils;
import com.twitter.distributedlog.util.FutureUtils;
+import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.util.Await;
import com.twitter.util.Future;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.SettableFeature;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ReflectionUtils;
@@ -57,22 +71,12 @@ import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
/**
- * Test Case for DistributedLog Service
+ * Test Case for DistributedLog Service.
*/
public class TestDistributedLogService extends TestDistributedLogBase {
- static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestDistributedLogService.class);
@Rule
public TestName testName = new TestName();
@@ -357,12 +361,12 @@ public class TestDistributedLogService extends TestDistributedLogBase {
Future<Void> closeFuture0 = s.requestClose("close 0");
assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus() ||
- StreamStatus.CLOSED == s.getStatus());
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
Future<Void> closeFuture1 = s.requestClose("close 1");
assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus() ||
- StreamStatus.CLOSED == s.getStatus());
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
Await.result(closeFuture0);
assertEquals("Stream " + streamName + " should be set to " + StreamStatus.CLOSED,
@@ -386,8 +390,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
Future<Void> closeFuture = s.requestClose("close");
assertTrue("Stream " + streamName + " should be set to " + StreamStatus.CLOSING,
- StreamStatus.CLOSING == s.getStatus() ||
- StreamStatus.CLOSED == s.getStatus());
+ StreamStatus.CLOSING == s.getStatus()
+ || StreamStatus.CLOSED == s.getStatus());
WriteOp op1 = createWriteOp(service, streamName, 0L);
s.submit(op1);
WriteResponse response1 = Await.result(op1.result());
@@ -430,8 +434,8 @@ public class TestDistributedLogService extends TestDistributedLogBase {
StreamImpl s = (StreamImpl) streamManager.getCachedStreams().get(streamName);
// the stream should be set CLOSING
- while (StreamStatus.CLOSING != s.getStatus() &&
- StreamStatus.CLOSED != s.getStatus()) {
+ while (StreamStatus.CLOSING != s.getStatus()
+ && StreamStatus.CLOSED != s.getStatus()) {
TimeUnit.MILLISECONDS.sleep(20);
}
assertNotNull("Writer should be initialized", s.getWriter());
@@ -443,9 +447,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
futureList.get(i).isDefined());
WriteResponse response = Await.result(futureList.get(i));
assertTrue("Op should fail with " + StatusCode.WRITE_CANCELLED_EXCEPTION,
- StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() ||
- StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
- StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+ StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
}
while (streamManager.getCachedStreams().containsKey(streamName)) {
@@ -518,7 +522,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
public void testTruncateOpNoChecksum() throws Exception {
DistributedLogServiceImpl localService = createConfiguredLocalService();
WriteContext ctx = new WriteContext();
- Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx);
+ Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
WriteResponse resp = Await.result(result);
assertEquals(StatusCode.SUCCESS, resp.getHeader().getCode());
localService.shutdown();
@@ -580,7 +584,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
ProtocolUtils.writeOpCRC32("test", buffer.array()));
// Overwrite 1 byte to corrupt data.
- buffer.put(1, (byte)0xab);
+ buffer.put(1, (byte) 0xab);
Future<WriteResponse> result = localService.writeWithContext("test", buffer, ctx);
WriteResponse resp = Await.result(result);
assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
@@ -607,7 +611,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
public void testTruncateOpChecksumBadChecksum() throws Exception {
DistributedLogServiceImpl localService = createConfiguredLocalService();
WriteContext ctx = new WriteContext().setCrc32(999);
- Future<WriteResponse> result = localService.truncate("test", new DLSN(1,2,3).serialize(), ctx);
+ Future<WriteResponse> result = localService.truncate("test", new DLSN(1, 2, 3).serialize(), ctx);
WriteResponse resp = Await.result(result);
assertEquals(StatusCode.CHECKSUM_FAILED, resp.getHeader().getCode());
localService.shutdown();
@@ -620,7 +624,7 @@ public class TestDistributedLogService extends TestDistributedLogBase {
new NullStatsLogger(),
new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
- (byte)0,
+ (byte) 0,
checksum,
false,
disabledFeature,
@@ -651,8 +655,14 @@ public class TestDistributedLogService extends TestDistributedLogBase {
String streamName = testName.getMethodName();
SettableFeature disabledFeature = new SettableFeature("", 1);
- WriteOp writeOp0 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
- WriteOp writeOp1 = getWriteOp(streamName, disabledFeature, ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+ WriteOp writeOp0 = getWriteOp(
+ streamName,
+ disabledFeature,
+ ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
+ WriteOp writeOp1 = getWriteOp(
+ streamName,
+ disabledFeature,
+ ProtocolUtils.writeOpCRC32(streamName, "test".getBytes()));
writeOp0.preExecute();
disabledFeature.set(0);
@@ -699,9 +709,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
for (Future<WriteResponse> future : futureList) {
WriteResponse response = Await.result(future);
assertTrue("Op should succeed or be rejected : " + response.getHeader().getCode(),
- StatusCode.SUCCESS == response.getHeader().getCode() ||
- StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
- StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
+ StatusCode.SUCCESS == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.STREAM_UNAVAILABLE == response.getHeader().getCode());
}
assertTrue("There should be no streams in the cache",
streamManager.getCachedStreams().isEmpty());
@@ -757,9 +767,9 @@ public class TestDistributedLogService extends TestDistributedLogBase {
WriteResponse response = Await.result(future);
assertTrue("Op should fail with " + StatusCode.BK_TRANSMIT_ERROR + " or be rejected : "
+ response.getHeader().getCode(),
- StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode() ||
- StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() ||
- StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
+ StatusCode.BK_TRANSMIT_ERROR == response.getHeader().getCode()
+ || StatusCode.WRITE_EXCEPTION == response.getHeader().getCode()
+ || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode());
}
// acquired streams should all been removed after we close them
assertTrue("There should be no streams in the acquired cache",
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
index f4c140e..50c915f 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestRegionUnavailable.java
@@ -20,6 +20,12 @@ package com.twitter.distributedlog.service;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.feature.DefaultFeatureProvider;
import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.feature.SettableFeature;
@@ -28,18 +34,14 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link com.twitter.distributedlog.exceptions.RegionUnavailableException}.
+ */
public class TestRegionUnavailable extends DistributedLogServerTestCase {
+ /**
+ * A feature provider for testing.
+ */
public static class TestFeatureProvider extends DefaultFeatureProvider {
public TestFeatureProvider(String rootScope,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
index eb65ad4..d8ef302 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestStatsFilter.java
@@ -17,18 +17,19 @@
*/
package com.twitter.distributedlog.service;
+import static org.junit.Assert.assertEquals;
+
import com.twitter.finagle.Service;
import com.twitter.finagle.service.ConstantService;
import com.twitter.util.Await;
import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
-
+import org.apache.bookkeeper.stats.StatsLogger;
import org.junit.Test;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link StatsFilter}.
+ */
public class TestStatsFilter {
class RuntimeExService<Req, Rep> extends Service<Req, Rep> {
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
index 9e04089..2ae3a23 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestBalancerUtils.java
@@ -17,13 +17,15 @@
*/
package com.twitter.distributedlog.service.balancer;
-import org.junit.Test;
+import static org.junit.Assert.assertEquals;
import java.util.HashMap;
import java.util.Map;
+import org.junit.Test;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link BalancerUtils}.
+ */
public class TestBalancerUtils {
@Test(timeout = 60000)
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
index 05eb724..8a24e21 100644
--- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
+++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/balancer/TestClusterBalancer.java
@@ -17,6 +17,9 @@
*/
package com.twitter.distributedlog.service.balancer;
+import static com.google.common.base.Charsets.UTF_8;
+import static org.junit.Assert.fail;
+
import com.google.common.base.Optional;
import com.google.common.util.concurrent.RateLimiter;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
@@ -25,6 +28,11 @@ import com.twitter.distributedlog.service.DistributedLogClient;
import com.twitter.distributedlog.service.DistributedLogCluster.DLServer;
import com.twitter.distributedlog.service.DistributedLogServerTestCase;
import com.twitter.util.Await;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
import org.apache.commons.lang3.tuple.Pair;
import org.junit.After;
import org.junit.Before;
@@ -33,18 +41,12 @@ import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.*;
-
+/**
+ * Test Case for {@link ClusterBalancer}.
+ */
public class TestClusterBalancer extends DistributedLogServerTestCase {
- static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
+ private static final Logger logger = LoggerFactory.getLogger(TestClusterBalancer.class);
private final int numServers = 5;
private final List<DLServer> cluster;
@@ -108,7 +110,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase {
for (int i = 0; i < numStreams; i++) {
String name = namePrefix + (streamId++);
try {
- Await.result(((DistributedLogClient) client.dlClient).write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
+ Await.result(client.dlClient.write(name, ByteBuffer.wrap(name.getBytes(UTF_8))));
} catch (Exception e) {
logger.error("Error writing stream {} : ", name, e);
throw e;
@@ -145,7 +147,7 @@ public class TestClusterBalancer extends DistributedLogServerTestCase {
Optional<RateLimiter> rateLimiter = Optional.absent();
Balancer balancer = new ClusterBalancer(client.dlClientBuilder,
- Pair.of((DistributedLogClient)client.dlClient, (MonitorServiceClient)client.dlClient));
+ Pair.of((DistributedLogClient) client.dlClient, (MonitorServiceClient) client.dlClient));
logger.info("Rebalancing from 'unknown' target");
try {
balancer.balanceAll("unknown", 10, rateLimiter);
[3/4] incubator-distributedlog git commit: DL-132: Enable check style
for distributedlog service module.
Posted by si...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
index 111a874..0ee7db4 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/ServiceStreamConfigProvider.java
@@ -21,26 +21,27 @@ import com.google.common.base.Optional;
import com.twitter.distributedlog.config.DynamicConfigurationFactory;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
-import org.apache.commons.configuration.ConfigurationException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.File;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.configuration.ConfigurationException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Provide per stream configuration to DistributedLog service layer.
*/
public class ServiceStreamConfigProvider implements StreamConfigProvider {
- static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(ServiceStreamConfigProvider.class);
+
+ private static final String CONFIG_EXTENSION = "conf";
private final File configBaseDir;
private final File defaultConfigFile;
private final StreamPartitionConverter partitionConverter;
private final DynamicConfigurationFactory configFactory;
private final DynamicDistributedLogConfiguration defaultDynConf;
- private final static String CONFIG_EXTENSION = "conf";
public ServiceStreamConfigProvider(String configPath,
String defaultConfigPath,
@@ -51,11 +52,13 @@ public class ServiceStreamConfigProvider implements StreamConfigProvider {
throws ConfigurationException {
this.configBaseDir = new File(configPath);
if (!configBaseDir.exists()) {
- throw new ConfigurationException("Stream configuration base directory " + configPath + " does not exist");
+ throw new ConfigurationException("Stream configuration base directory "
+ + configPath + " does not exist");
}
this.defaultConfigFile = new File(configPath);
if (!defaultConfigFile.exists()) {
- throw new ConfigurationException("Stream configuration default config " + defaultConfigPath + " does not exist");
+ throw new ConfigurationException("Stream configuration default config "
+ + defaultConfigPath + " does not exist");
}
// Construct reloading default configuration
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
new file mode 100644
index 0000000..bb0026a
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/config/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Server Configurations.
+ */
+package com.twitter.distributedlog.service.config;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
new file mode 100644
index 0000000..4fb3673
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * DistributedLog Proxy Service.
+ */
+package com.twitter.distributedlog.service;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
index 144e358..fb2d6d2 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/EqualLoadAppraiser.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,18 +20,20 @@ package com.twitter.distributedlog.service.placement;
import com.twitter.util.Future;
/**
- * Created for those who hold these truths to be self-evident, that all streams are created equal,
+ * Equal Load Appraiser.
+ *
+ * <p>Created for those who hold these truths to be self-evident, that all streams are created equal,
* that they are endowed by their creator with certain unalienable loads, that among these are
* Uno, Eins, and One.
*/
public class EqualLoadAppraiser implements LoadAppraiser {
- @Override
- public Future<StreamLoad> getStreamLoad(String stream) {
- return Future.value(new StreamLoad(stream, 1));
- }
+ @Override
+ public Future<StreamLoad> getStreamLoad(String stream) {
+ return Future.value(new StreamLoad(stream, 1));
+ }
- @Override
- public Future<Void> refreshCache() {
- return Future.value(null);
- }
+ @Override
+ public Future<Void> refreshCache() {
+ return Future.value(null);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
index 8c8dc23..c25c267 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LeastLoadPlacementPolicy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,12 @@
*/
package com.twitter.distributedlog.service.placement;
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.util.Duration;
+import com.twitter.util.Function;
+import com.twitter.util.Future;
+import com.twitter.util.Futures;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -24,174 +30,171 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-
-import scala.Function1;
-import scala.runtime.BoxedUnit;
-
import org.apache.bookkeeper.stats.Gauge;
-import org.apache.bookkeeper.stats.Stats;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.util.Duration;
-import com.twitter.util.Function;
-import com.twitter.util.Future;
-import com.twitter.util.Futures;
+import org.slf4j.LoggerFactory;
+import scala.runtime.BoxedUnit;
/**
- * A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
+ * Least Load Placement Policy.
+ *
+ * <p>A LoadPlacementPolicy that attempts to place streams in such a way that the load is balanced as
* evenly as possible across all shards. The LoadAppraiser remains responsible for determining what
* the load of a server would be. This placement policy then distributes these streams across the
* servers.
*/
public class LeastLoadPlacementPolicy extends PlacementPolicy {
- private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
- private Map<String, String> streamToServer = new HashMap<String, String>();
-
- public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
- Duration refreshInterval, StatsLogger statsLogger) {
- super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
- statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
- @Override
- public Number getDefaultValue() {
- return 0;
- }
-
- @Override
- public Number getSample() {
- if (serverLoads.size() > 0) {
- return serverLoads.last().getLoad() - serverLoads.first().getLoad();
- } else {
- return getDefaultValue();
- }
- }
- });
- }
-
- private synchronized String getStreamOwner(String stream) {
- return streamToServer.get(stream);
- }
-
- @Override
- public Future<String> placeStream(String stream) {
- String streamOwner = getStreamOwner(stream);
- if (null != streamOwner) {
- return Future.value(streamOwner);
+
+ private static final Logger logger = LoggerFactory.getLogger(LeastLoadPlacementPolicy.class);
+
+ private TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+ private Map<String, String> streamToServer = new HashMap<String, String>();
+
+ public LeastLoadPlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+ DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Duration refreshInterval, StatsLogger statsLogger) {
+ super(loadAppraiser, routingService, namespace, placementStateManager, refreshInterval, statsLogger);
+ statsLogger.registerGauge("placement/load.diff", new Gauge<Number>() {
+ @Override
+ public Number getDefaultValue() {
+ return 0;
+ }
+
+ @Override
+ public Number getSample() {
+ if (serverLoads.size() > 0) {
+ return serverLoads.last().getLoad() - serverLoads.first().getLoad();
+ } else {
+ return getDefaultValue();
+ }
+ }
+ });
}
- Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
- return streamLoadFuture.map(new Function<StreamLoad, String>() {
- @Override
- public String apply(StreamLoad streamLoad) {
- return placeStreamSynchronized(streamLoad);
- }
- });
- }
-
- synchronized private String placeStreamSynchronized(StreamLoad streamLoad) {
- ServerLoad serverLoad = serverLoads.pollFirst();
- serverLoad.addStream(streamLoad);
- serverLoads.add(serverLoad);
- return serverLoad.getServer();
- }
-
- @Override
- public void refresh() {
- logger.info("Refreshing server loads.");
- Future<Void> refresh = loadAppraiser.refreshCache();
- final Set<String> servers = getServers();
- final Set<String> allStreams = getStreams();
- Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(new Function<Void, Future<TreeSet<ServerLoad>>>() {
- @Override
- public Future<TreeSet<ServerLoad>> apply(Void v1) {
- return calculate(servers, allStreams);
- }
- });
- serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
- try {
- updateServerLoads(serverLoads);
- } catch (PlacementStateManager.StateManagerSaveException e) {
- logger.error("The refreshed mapping could not be persisted and will not be used.", e);
- }
- return BoxedUnit.UNIT;
- }
- });
- }
-
- synchronized private void updateServerLoads(TreeSet<ServerLoad> serverLoads) throws PlacementStateManager.StateManagerSaveException {
- this.placementStateManager.saveOwnership(serverLoads);
- this.streamToServer = serverLoadsToMap(serverLoads);
- this.serverLoads = serverLoads;
- }
-
- @Override
- synchronized public void load(TreeSet<ServerLoad> serverLoads) {
- this.serverLoads = serverLoads;
- this.streamToServer = serverLoadsToMap(serverLoads);
- }
-
- public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
- logger.info("Calculating server loads");
- final long startTime = System.currentTimeMillis();
- ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
-
- for (String stream: streams) {
- Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
- futures.add(streamLoad);
+
+ private synchronized String getStreamOwner(String stream) {
+ return streamToServer.get(stream);
}
- return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
- @Override
- public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
- /* Sort streamLoads so largest streams are placed first for better balance */
- TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
- for (StreamLoad streamLoad: streamLoads) {
- streamQueue.add(streamLoad);
+ @Override
+ public Future<String> placeStream(String stream) {
+ String streamOwner = getStreamOwner(stream);
+ if (null != streamOwner) {
+ return Future.value(streamOwner);
}
+ Future<StreamLoad> streamLoadFuture = loadAppraiser.getStreamLoad(stream);
+ return streamLoadFuture.map(new Function<StreamLoad, String>() {
+ @Override
+ public String apply(StreamLoad streamLoad) {
+ return placeStreamSynchronized(streamLoad);
+ }
+ });
+ }
- TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
- for (String server: servers) {
- ServerLoad serverLoad = new ServerLoad(server);
- if (!streamQueue.isEmpty()) {
- serverLoad.addStream(streamQueue.pollFirst());
- }
- serverLoads.add(serverLoad);
+ private synchronized String placeStreamSynchronized(StreamLoad streamLoad) {
+ ServerLoad serverLoad = serverLoads.pollFirst();
+ serverLoad.addStream(streamLoad);
+ serverLoads.add(serverLoad);
+ return serverLoad.getServer();
+ }
+
+ @Override
+ public void refresh() {
+ logger.info("Refreshing server loads.");
+ Future<Void> refresh = loadAppraiser.refreshCache();
+ final Set<String> servers = getServers();
+ final Set<String> allStreams = getStreams();
+ Future<TreeSet<ServerLoad>> serverLoadsFuture = refresh.flatMap(
+ new Function<Void, Future<TreeSet<ServerLoad>>>() {
+ @Override
+ public Future<TreeSet<ServerLoad>> apply(Void v1) {
+ return calculate(servers, allStreams);
+ }
+ });
+ serverLoadsFuture.map(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+ try {
+ updateServerLoads(serverLoads);
+ } catch (PlacementStateManager.StateManagerSaveException e) {
+ logger.error("The refreshed mapping could not be persisted and will not be used.", e);
+ }
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ private synchronized void updateServerLoads(TreeSet<ServerLoad> serverLoads)
+ throws PlacementStateManager.StateManagerSaveException {
+ this.placementStateManager.saveOwnership(serverLoads);
+ this.streamToServer = serverLoadsToMap(serverLoads);
+ this.serverLoads = serverLoads;
+ }
+
+ @Override
+ public synchronized void load(TreeSet<ServerLoad> serverLoads) {
+ this.serverLoads = serverLoads;
+ this.streamToServer = serverLoadsToMap(serverLoads);
+ }
+
+ public Future<TreeSet<ServerLoad>> calculate(final Set<String> servers, Set<String> streams) {
+ logger.info("Calculating server loads");
+ final long startTime = System.currentTimeMillis();
+ ArrayList<Future<StreamLoad>> futures = new ArrayList<Future<StreamLoad>>(streams.size());
+
+ for (String stream : streams) {
+ Future<StreamLoad> streamLoad = loadAppraiser.getStreamLoad(stream);
+ futures.add(streamLoad);
}
- while (!streamQueue.isEmpty()) {
- ServerLoad serverLoad = serverLoads.pollFirst();
- serverLoad.addStream(streamQueue.pollFirst());
- serverLoads.add(serverLoad);
+ return Futures.collect(futures).map(new Function<List<StreamLoad>, TreeSet<ServerLoad>>() {
+ @Override
+ public TreeSet<ServerLoad> apply(List<StreamLoad> streamLoads) {
+ /* Sort streamLoads so largest streams are placed first for better balance */
+ TreeSet<StreamLoad> streamQueue = new TreeSet<StreamLoad>();
+ for (StreamLoad streamLoad : streamLoads) {
+ streamQueue.add(streamLoad);
+ }
+
+ TreeSet<ServerLoad> serverLoads = new TreeSet<ServerLoad>();
+ for (String server : servers) {
+ ServerLoad serverLoad = new ServerLoad(server);
+ if (!streamQueue.isEmpty()) {
+ serverLoad.addStream(streamQueue.pollFirst());
+ }
+ serverLoads.add(serverLoad);
+ }
+
+ while (!streamQueue.isEmpty()) {
+ ServerLoad serverLoad = serverLoads.pollFirst();
+ serverLoad.addStream(streamQueue.pollFirst());
+ serverLoads.add(serverLoad);
+ }
+ return serverLoads;
+ }
+ }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
+ placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
+ return BoxedUnit.UNIT;
+ }
+ }).onFailure(new Function<Throwable, BoxedUnit>() {
+ @Override
+ public BoxedUnit apply(Throwable t) {
+ logger.error("Failure calculating loads", t);
+ placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
+ return BoxedUnit.UNIT;
+ }
+ });
+ }
+
+ private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
+ HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
+ for (ServerLoad serverLoad : serverLoads) {
+ for (StreamLoad streamLoad : serverLoad.getStreamLoads()) {
+ streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
+ }
}
- return serverLoads;
- }
- }).onSuccess(new Function<TreeSet<ServerLoad>, BoxedUnit>() {
- @Override
- public BoxedUnit apply(TreeSet<ServerLoad> serverLoads) {
- placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime);
- return BoxedUnit.UNIT;
- }
- }).onFailure(new Function<Throwable, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Throwable t) {
- logger.error("Failure calculating loads", t);
- placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime);
- return BoxedUnit.UNIT;
- }
- });
- }
-
- private static Map<String, String> serverLoadsToMap(Collection<ServerLoad> serverLoads) {
- HashMap<String, String> streamToServer = new HashMap<String, String>(serverLoads.size());
- for (ServerLoad serverLoad: serverLoads) {
- for (StreamLoad streamLoad: serverLoad.getStreamLoads()) {
- streamToServer.put(streamLoad.getStream(), serverLoad.getServer());
- }
+ return streamToServer;
}
- return streamToServer;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
index 784f106..53c568a 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/LoadAppraiser.java
@@ -19,7 +19,21 @@ package com.twitter.distributedlog.service.placement;
import com.twitter.util.Future;
+/**
+ * Interface for load appraiser.
+ */
public interface LoadAppraiser {
- Future<StreamLoad> getStreamLoad(String stream);
- Future<Void> refreshCache();
+ /**
+ * Retrieve the stream load for a given {@code stream}.
+ *
+ * @param stream name of the stream
+ * @return the stream load of the stream.
+ */
+ Future<StreamLoad> getStreamLoad(String stream);
+
+ /**
+ * Refesch the cache.
+ * @return
+ */
+ Future<Void> refreshCache();
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
index 2044428..46e8940 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementPolicy.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,6 +17,15 @@
*/
package com.twitter.distributedlog.service.placement;
+import com.twitter.distributedlog.client.routing.RoutingService;
+import com.twitter.distributedlog.namespace.DistributedLogNamespace;
+import com.twitter.distributedlog.service.DLSocketAddress;
+import com.twitter.util.Duration;
+import com.twitter.util.Function0;
+import com.twitter.util.Future;
+import com.twitter.util.ScheduledThreadPoolTimer;
+import com.twitter.util.Time;
+import com.twitter.util.Timer;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -24,125 +33,116 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
-
-import scala.runtime.BoxedUnit;
-
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
-import com.twitter.distributedlog.client.routing.RoutingService;
-import com.twitter.distributedlog.namespace.DistributedLogNamespace;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.util.Duration;
-import com.twitter.util.Function0;
-import com.twitter.util.Future;
-import com.twitter.util.ScheduledThreadPoolTimer;
-import com.twitter.util.Time;
-import com.twitter.util.Timer;
+import scala.runtime.BoxedUnit;
/**
- * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream
- * contains. The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
+ * A PlacementPolicy assigns streams to servers given an appraisal of the load that the stream contains.
+ *
+ * <p>The load of a stream is determined by the LoadAppraiser used. The PlacementPolicy will
* then distributed these StreamLoads to the available servers in a manner defined by the
* implementation creating ServerLoad objects. It then saves this assignment via the
* PlacementStateManager.
*/
public abstract class PlacementPolicy {
- protected final LoadAppraiser loadAppraiser;
- protected final RoutingService routingService;
- protected final DistributedLogNamespace namespace;
- protected final PlacementStateManager placementStateManager;
- private final Duration refreshInterval;
- protected static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
- protected final OpStatsLogger placementCalcStats;
- private Timer placementRefreshTimer;
+ private static final Logger logger = LoggerFactory.getLogger(PlacementPolicy.class);
- public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
- DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
- Duration refreshInterval, StatsLogger statsLogger) {
- this.loadAppraiser = loadAppraiser;
- this.routingService = routingService;
- this.namespace = namespace;
- this.placementStateManager = placementStateManager;
- this.refreshInterval = refreshInterval;
- placementCalcStats = statsLogger.getOpStatsLogger("placement");
- }
+ protected final LoadAppraiser loadAppraiser;
+ protected final RoutingService routingService;
+ protected final DistributedLogNamespace namespace;
+ protected final PlacementStateManager placementStateManager;
+ private final Duration refreshInterval;
+ protected final OpStatsLogger placementCalcStats;
+ private Timer placementRefreshTimer;
- public Set<String> getServers() {
- Set<SocketAddress> hosts = routingService.getHosts();
- Set<String> servers = new HashSet<String>(hosts.size());
- for (SocketAddress address: hosts) {
- servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+ public PlacementPolicy(LoadAppraiser loadAppraiser, RoutingService routingService,
+ DistributedLogNamespace namespace, PlacementStateManager placementStateManager,
+ Duration refreshInterval, StatsLogger statsLogger) {
+ this.loadAppraiser = loadAppraiser;
+ this.routingService = routingService;
+ this.namespace = namespace;
+ this.placementStateManager = placementStateManager;
+ this.refreshInterval = refreshInterval;
+ placementCalcStats = statsLogger.getOpStatsLogger("placement");
}
- return servers;
- }
- public Set<String> getStreams() {
- Set<String> streams = new HashSet<String>();
- try {
- Iterator<String> logs = namespace.getLogs();
- while (logs.hasNext()) {
- streams.add(logs.next());
- }
- } catch (IOException e) {
- logger.error("Could not get streams for placement policy.", e);
+ public Set<String> getServers() {
+ Set<SocketAddress> hosts = routingService.getHosts();
+ Set<String> servers = new HashSet<String>(hosts.size());
+ for (SocketAddress address : hosts) {
+ servers.add(DLSocketAddress.toString((InetSocketAddress) address));
+ }
+ return servers;
}
- return streams;
- }
-
- public void start(boolean leader) {
- logger.info("Starting placement policy");
- TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
- for (String server: getServers()) {
- emptyServerLoads.add(new ServerLoad(server));
+ public Set<String> getStreams() {
+ Set<String> streams = new HashSet<String>();
+ try {
+ Iterator<String> logs = namespace.getLogs();
+ while (logs.hasNext()) {
+ streams.add(logs.next());
+ }
+ } catch (IOException e) {
+ logger.error("Could not get streams for placement policy.", e);
+ }
+ return streams;
}
- load(emptyServerLoads); //Pre-Load so streams don't NPE
- if (leader) { //this is the leader shard
- logger.info("Shard is leader. Scheduling timed refresh.");
- placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
- placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
- @Override
- public BoxedUnit apply() {
- refresh();
- return BoxedUnit.UNIT;
+
+ public void start(boolean leader) {
+ logger.info("Starting placement policy");
+
+ TreeSet<ServerLoad> emptyServerLoads = new TreeSet<ServerLoad>();
+ for (String server : getServers()) {
+ emptyServerLoads.add(new ServerLoad(server));
}
- });
- } else {
- logger.info("Shard is not leader. Watching for server load changes.");
- placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
- @Override
- public void callback(TreeSet<ServerLoad> serverLoads) {
- if (!serverLoads.isEmpty()) {
- load(serverLoads);
- }
+ load(emptyServerLoads); //Pre-Load so streams don't NPE
+ if (leader) { //this is the leader shard
+ logger.info("Shard is leader. Scheduling timed refresh.");
+ placementRefreshTimer = new ScheduledThreadPoolTimer(1, "timer", true);
+ placementRefreshTimer.schedule(Time.now(), refreshInterval, new Function0<BoxedUnit>() {
+ @Override
+ public BoxedUnit apply() {
+ refresh();
+ return BoxedUnit.UNIT;
+ }
+ });
+ } else {
+ logger.info("Shard is not leader. Watching for server load changes.");
+ placementStateManager.watch(new PlacementStateManager.PlacementCallback() {
+ @Override
+ public void callback(TreeSet<ServerLoad> serverLoads) {
+ if (!serverLoads.isEmpty()) {
+ load(serverLoads);
+ }
+ }
+ });
}
- });
}
- }
- public void close() {
- if (placementRefreshTimer != null) {
- placementRefreshTimer.stop();
+ public void close() {
+ if (placementRefreshTimer != null) {
+ placementRefreshTimer.stop();
+ }
}
- }
- /**
- * Places the stream on a server according to the policy and returns a future contianing the
- * host that owns the stream upon completion
- */
- public abstract Future<String> placeStream(String stream);
+ /**
+ * Places the stream on a server according to the policy.
+ *
+ * <p>It returns a future containing the host that owns the stream upon completion
+ */
+ public abstract Future<String> placeStream(String stream);
- /**
- * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager
- */
- public abstract void refresh();
+ /**
+ * Recalculates the entire placement mapping and updates stores it using the PlacementStateManager.
+ */
+ public abstract void refresh();
- /**
- * Loads the placement mapping into the node from a TreeSet of ServerLoads
- */
- public abstract void load(TreeSet<ServerLoad> serverLoads);
+ /**
+ * Loads the placement mapping into the node from a TreeSet of ServerLoads.
+ */
+ public abstract void load(TreeSet<ServerLoad> serverLoads);
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
index cd0d906..17e4685 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/PlacementStateManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,46 +20,60 @@ package com.twitter.distributedlog.service.placement;
import java.util.TreeSet;
/**
- * The PlacementStateManager handles persistence of calculated resource placements including, the
- * storage once the calculated, and the retrieval by the other shards.
+ * The PlacementStateManager handles persistence of calculated resource placements.
*/
public interface PlacementStateManager {
- /**
- * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage
- */
- void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
+ /**
+ * Saves the ownership mapping as a TreeSet of ServerLoads to persistent storage.
+ */
+ void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException;
- /**
- * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage
- */
- TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
+ /**
+ * Loads the ownership mapping as TreeSet of ServerLoads from persistent storage.
+ */
+ TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException;
- /**
- * Watch the persistent storage for changes to the ownership mapping and calls placementCallback
- * with the new mapping when a change occurs
- */
- void watch(PlacementCallback placementCallback);
+ /**
+ * Watch the persistent storage for changes to the ownership mapping.
+ *
+ * <p>The placementCallback callbacks will be triggered with the new mapping when a change occurs.
+ */
+ void watch(PlacementCallback placementCallback);
- interface PlacementCallback {
- void callback(TreeSet<ServerLoad> serverLoads);
- }
+ /**
+ * Placement Callback.
+ *
+ * <p>The callback is triggered when server loads are updated.
+ */
+ interface PlacementCallback {
+ void callback(TreeSet<ServerLoad> serverLoads);
+ }
- abstract class StateManagerException extends Exception {
- public StateManagerException(String message, Exception e) {
- super(message, e);
+ /**
+ * The base exception thrown when state manager encounters errors.
+ */
+ abstract class StateManagerException extends Exception {
+ public StateManagerException(String message, Exception e) {
+ super(message, e);
+ }
}
- }
- class StateManagerLoadException extends StateManagerException {
- public StateManagerLoadException(Exception e) {
- super("Load of Ownership failed", e);
+ /**
+ * Exception thrown when failed to load the ownership mapping.
+ */
+ class StateManagerLoadException extends StateManagerException {
+ public StateManagerLoadException(Exception e) {
+ super("Load of Ownership failed", e);
+ }
}
- }
- class StateManagerSaveException extends StateManagerException {
- public StateManagerSaveException(Exception e) {
- super("Save of Ownership failed", e);
+ /**
+ * Exception thrown when failed to save the ownership mapping.
+ */
+ class StateManagerSaveException extends StateManagerException {
+ public StateManagerSaveException(Exception e) {
+ super("Save of Ownership failed", e);
+ }
}
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
index 801e499..a0b4959 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ServerLoad.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,139 +17,142 @@
*/
package com.twitter.distributedlog.service.placement;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Set;
-import com.twitter.distributedlog.service.placement.thrift.*;
-
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TMemoryInputTransport;
-import static com.google.common.base.Charsets.UTF_8;
-
/**
- * A comparable data object containing the identifier of the server, total appraised load on the
+ * An object represents the server load.
+ *
+ * <p>A comparable data object containing the identifier of the server, total appraised load on the
* server, and all streams assigned to the server by the resource placement mapping. This is
* comparable first by load and then by server so that a sorted data structure of these will be
* consistent across multiple calculations.
*/
public class ServerLoad implements Comparable {
- private static final int BUFFER_SIZE = 4096000;
- private final String server;
- private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
- private long load = 0l;
-
- public ServerLoad(String server) {
- this.server = server;
- }
-
- synchronized public long addStream(StreamLoad stream) {
- this.load += stream.getLoad();
- streamLoads.add(stream);
- return this.load;
- }
-
- synchronized public long removeStream(String stream) {
- for (StreamLoad streamLoad : streamLoads) {
- if (streamLoad.stream.equals(stream)) {
- this.load -= streamLoad.getLoad();
- streamLoads.remove(streamLoad);
+ private static final int BUFFER_SIZE = 4096000;
+ private final String server;
+ private final HashSet<StreamLoad> streamLoads = new HashSet<StreamLoad>();
+ private long load = 0L;
+
+ public ServerLoad(String server) {
+ this.server = server;
+ }
+
+ public synchronized long addStream(StreamLoad stream) {
+ this.load += stream.getLoad();
+ streamLoads.add(stream);
return this.load;
- }
}
- return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
- }
-
- public synchronized long getLoad() {
- return load;
- }
-
- public synchronized Set<StreamLoad> getStreamLoads() {
- return streamLoads;
- }
-
- public synchronized String getServer() {
- return server;
- }
-
- protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
- com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
- = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
- tServerLoad.setServer(server);
- tServerLoad.setLoad(load);
- ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads
- = new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
- for (StreamLoad streamLoad: streamLoads) {
- tStreamLoads.add(streamLoad.toThrift());
+
+ public synchronized long removeStream(String stream) {
+ for (StreamLoad streamLoad : streamLoads) {
+ if (streamLoad.stream.equals(stream)) {
+ this.load -= streamLoad.getLoad();
+ streamLoads.remove(streamLoad);
+ return this.load;
+ }
+ }
+ return this.load; //Throwing an exception wouldn't help us as our logic should never reach here
+ }
+
+ public synchronized long getLoad() {
+ return load;
+ }
+
+ public synchronized Set<StreamLoad> getStreamLoads() {
+ return streamLoads;
+ }
+
+ public synchronized String getServer() {
+ return server;
+ }
+
+ protected synchronized com.twitter.distributedlog.service.placement.thrift.ServerLoad toThrift() {
+ com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+ new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+ tServerLoad.setServer(server);
+ tServerLoad.setLoad(load);
+ ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad> tStreamLoads =
+ new ArrayList<com.twitter.distributedlog.service.placement.thrift.StreamLoad>();
+ for (StreamLoad streamLoad : streamLoads) {
+ tStreamLoads.add(streamLoad.toThrift());
+ }
+ tServerLoad.setStreams(tStreamLoads);
+ return tServerLoad;
+ }
+
+ public byte[] serialize() throws IOException {
+ TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ toThrift().write(protocol);
+ transport.flush();
+ return transport.toString(UTF_8.name()).getBytes(UTF_8);
+ } catch (TException e) {
+ throw new IOException("Failed to serialize server load : ", e);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IOException("Failed to serialize server load : ", uee);
+ }
+ }
+
+ public static ServerLoad deserialize(byte[] data) throws IOException {
+ com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad =
+ new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
+ TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ tServerLoad.read(protocol);
+ ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
+ if (tServerLoad.isSetStreams()) {
+ for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad :
+ tServerLoad.getStreams()) {
+ serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+ }
+ }
+ return serverLoad;
+ } catch (TException e) {
+ throw new IOException("Failed to deserialize server load : ", e);
+ }
}
- tServerLoad.setStreams(tStreamLoads);
- return tServerLoad;
- }
-
- public byte[] serialize() throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- toThrift().write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize server load : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize server load : ", uee);
+
+ @Override
+ public synchronized int compareTo(Object o) {
+ ServerLoad other = (ServerLoad) o;
+ if (load == other.getLoad()) {
+ return server.compareTo(other.getServer());
+ } else {
+ return Long.compare(load, other.getLoad());
+ }
}
- }
-
- public static ServerLoad deserialize(byte[] data) throws IOException {
- com.twitter.distributedlog.service.placement.thrift.ServerLoad tServerLoad
- = new com.twitter.distributedlog.service.placement.thrift.ServerLoad();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- tServerLoad.read(protocol);
- ServerLoad serverLoad = new ServerLoad(tServerLoad.getServer());
- if (tServerLoad.isSetStreams()) {
- for (com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad : tServerLoad.getStreams()) {
- serverLoad.addStream(new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad()));
+
+ @Override
+ public synchronized boolean equals(Object o) {
+ if (!(o instanceof ServerLoad)) {
+ return false;
}
- }
- return serverLoad;
- } catch (TException e) {
- throw new IOException("Failed to deserialize server load : ", e);
+ ServerLoad other = (ServerLoad) o;
+ return server.equals(other.getServer())
+ && load == other.getLoad()
+ && streamLoads.equals(other.getStreamLoads());
}
- }
-
- @Override
- public synchronized int compareTo(Object o) {
- ServerLoad other = (ServerLoad) o;
- if (load == other.getLoad()) {
- return server.compareTo(other.getServer());
- } else {
- return Long.compare(load, other.getLoad());
+
+ @Override
+ public synchronized String toString() {
+ return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
}
- }
- @Override
- public synchronized boolean equals(Object o) {
- if (!(o instanceof ServerLoad)) {
- return false;
+ @Override
+ public synchronized int hashCode() {
+ return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
}
- ServerLoad other = (ServerLoad) o;
- return server.equals(other.getServer()) && load == other.getLoad() && streamLoads.equals(other.getStreamLoads());
- }
-
- @Override
- public synchronized String toString() {
- return String.format("ServerLoad<Server: %s, Load: %d, Streams: %s>", server, load, streamLoads);
- }
-
- @Override
- public synchronized int hashCode() {
- return new HashCodeBuilder().append(server).append(load).append(streamLoads).build();
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
index d7b7efd..c0b0ce1 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/StreamLoad.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,96 +17,99 @@
*/
package com.twitter.distributedlog.service.placement;
+import static com.google.common.base.Charsets.UTF_8;
+
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TJSONProtocol;
import org.apache.thrift.transport.TMemoryBuffer;
import org.apache.thrift.transport.TMemoryInputTransport;
-import static com.google.common.base.Charsets.UTF_8;
-
/**
- * A comparable data object containing the identifier of the stream and the appraised load produced
+ * An object represent the load of a stream.
+ *
+ * <p>A comparable data object containing the identifier of the stream and the appraised load produced
* by the stream.
*/
public class StreamLoad implements Comparable {
- private static final int BUFFER_SIZE = 4096;
- public final String stream;
- private final int load;
+ private static final int BUFFER_SIZE = 4096;
+ public final String stream;
+ private final int load;
- public StreamLoad(String stream, int load) {
- this.stream = stream;
- this.load = load;
- }
+ public StreamLoad(String stream, int load) {
+ this.stream = stream;
+ this.load = load;
+ }
- public int getLoad() {
- return load;
- }
+ public int getLoad() {
+ return load;
+ }
- public String getStream() {
- return stream;
- }
+ public String getStream() {
+ return stream;
+ }
- protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
- com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
- return tStreamLoad.setStream(stream).setLoad(load);
- }
+ protected com.twitter.distributedlog.service.placement.thrift.StreamLoad toThrift() {
+ com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+ new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+ return tStreamLoad.setStream(stream).setLoad(load);
+ }
- public byte[] serialize() throws IOException {
- TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- toThrift().write(protocol);
- transport.flush();
- return transport.toString(UTF_8.name()).getBytes(UTF_8);
- } catch (TException e) {
- throw new IOException("Failed to serialize stream load : ", e);
- } catch (UnsupportedEncodingException uee) {
- throw new IOException("Failed to serialize stream load : ", uee);
+ public byte[] serialize() throws IOException {
+ TMemoryBuffer transport = new TMemoryBuffer(BUFFER_SIZE);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ toThrift().write(protocol);
+ transport.flush();
+ return transport.toString(UTF_8.name()).getBytes(UTF_8);
+ } catch (TException e) {
+ throw new IOException("Failed to serialize stream load : ", e);
+ } catch (UnsupportedEncodingException uee) {
+ throw new IOException("Failed to serialize stream load : ", uee);
+ }
}
- }
- public static StreamLoad deserialize(byte[] data) throws IOException {
- com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad = new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
- TMemoryInputTransport transport = new TMemoryInputTransport(data);
- TJSONProtocol protocol = new TJSONProtocol(transport);
- try {
- tStreamLoad.read(protocol);
- return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
- } catch (TException e) {
- throw new IOException("Failed to deserialize stream load : ", e);
+ public static StreamLoad deserialize(byte[] data) throws IOException {
+ com.twitter.distributedlog.service.placement.thrift.StreamLoad tStreamLoad =
+ new com.twitter.distributedlog.service.placement.thrift.StreamLoad();
+ TMemoryInputTransport transport = new TMemoryInputTransport(data);
+ TJSONProtocol protocol = new TJSONProtocol(transport);
+ try {
+ tStreamLoad.read(protocol);
+ return new StreamLoad(tStreamLoad.getStream(), tStreamLoad.getLoad());
+ } catch (TException e) {
+ throw new IOException("Failed to deserialize stream load : ", e);
+ }
}
- }
- @Override
- public int compareTo(Object o) {
- StreamLoad other = (StreamLoad) o;
- if (load == other.getLoad()) {
- return stream.compareTo(other.getStream());
- } else {
- return Long.compare(load, other.getLoad());
+ @Override
+ public int compareTo(Object o) {
+ StreamLoad other = (StreamLoad) o;
+ if (load == other.getLoad()) {
+ return stream.compareTo(other.getStream());
+ } else {
+ return Long.compare(load, other.getLoad());
+ }
}
- }
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof StreamLoad)) {
- return false;
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof StreamLoad)) {
+ return false;
+ }
+ StreamLoad other = (StreamLoad) o;
+ return stream.equals(other.getStream()) && load == other.getLoad();
}
- StreamLoad other = (StreamLoad) o;
- return stream.equals(other.getStream()) && load == other.getLoad();
- }
- @Override
- public String toString() {
- return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
- }
+ @Override
+ public String toString() {
+ return String.format("StreamLoad<Stream: %s, Load: %d>", stream, load);
+ }
- @Override
- public int hashCode() {
- return new HashCodeBuilder().append(stream).append(load).build();
- }
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder().append(stream).append(load).build();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
index 4f01bdc..977ae04 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/ZKPlacementStateManager.java
@@ -6,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -17,13 +17,16 @@
*/
package com.twitter.distributedlog.service.placement;
+import com.twitter.distributedlog.DistributedLogConfiguration;
+import com.twitter.distributedlog.ZooKeeperClient;
+import com.twitter.distributedlog.impl.BKNamespaceDriver;
+import com.twitter.distributedlog.util.Utils;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.List;
import java.util.TreeSet;
-
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
@@ -35,139 +38,136 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.twitter.distributedlog.BKDistributedLogNamespace;
-import com.twitter.distributedlog.DistributedLogConfiguration;
-import com.twitter.distributedlog.ZooKeeperClient;
-import com.twitter.distributedlog.impl.BKNamespaceDriver;
-import com.twitter.distributedlog.util.Utils;
-
/**
* An implementation of the PlacementStateManager that saves data to and loads from Zookeeper to
* avoid necessitating an additional system for the resource placement.
*/
public class ZKPlacementStateManager implements PlacementStateManager {
- static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
- private static final String SERVER_LOAD_DIR = "/.server-load";
- private final String serverLoadPath;
- private final ZooKeeperClient zkClient;
+ private static final Logger logger = LoggerFactory.getLogger(ZKPlacementStateManager.class);
- private boolean watching = false;
+ private static final String SERVER_LOAD_DIR = "/.server-load";
- public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
- String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
- zkClient = BKNamespaceDriver.createZKClientBuilder(
- String.format("ZKPlacementStateManager-%s", zkServers),
- conf,
- zkServers,
- statsLogger.scope("placement_state_manager")).build();
- serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
- }
+ private final String serverLoadPath;
+ private final ZooKeeperClient zkClient;
- private void createServerLoadPathIfNoExists(byte[] data)
- throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
- try {
- Utils.zkCreateFullPathOptimistic(zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
- } catch (KeeperException.NodeExistsException nee) {
- logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
+ private boolean watching = false;
+
+ public ZKPlacementStateManager(URI uri, DistributedLogConfiguration conf, StatsLogger statsLogger) {
+ String zkServers = BKNamespaceDriver.getZKServersFromDLUri(uri);
+ zkClient = BKNamespaceDriver.createZKClientBuilder(
+ String.format("ZKPlacementStateManager-%s", zkServers),
+ conf,
+ zkServers,
+ statsLogger.scope("placement_state_manager")).build();
+ serverLoadPath = uri.getPath() + SERVER_LOAD_DIR;
}
- }
-
- @Override
- public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
- logger.info("saving ownership");
- try {
- ZooKeeper zk = zkClient.get();
- // use timestamp as data so watchers will see any changes
- byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
-
- if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
- createServerLoadPathIfNoExists(timestamp);
- }
-
- Transaction tx = zk.transaction();
- List<String> children = zk.getChildren(serverLoadPath, false);
- HashSet<String> servers = new HashSet<String>(children);
- tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
- for (ServerLoad serverLoad : serverLoads) {
- String server = serverToZkFormat(serverLoad.getServer());
- String serverPath = serverPath(server);
- if (servers.contains(server)) {
- servers.remove(server);
- tx.setData(serverPath, serverLoad.serialize(), -1);
- } else {
- tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+
+ private void createServerLoadPathIfNoExists(byte[] data)
+ throws ZooKeeperClient.ZooKeeperConnectionException, KeeperException, InterruptedException {
+ try {
+ Utils.zkCreateFullPathOptimistic(
+ zkClient, serverLoadPath, data, zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+ } catch (KeeperException.NodeExistsException nee) {
+ logger.debug("the server load path {} is already created by others", serverLoadPath, nee);
}
- }
- for (String server : servers) {
- tx.delete(serverPath(server), -1);
- }
- tx.commit();
- } catch (InterruptedException | IOException | KeeperException e) {
- throw new StateManagerSaveException(e);
}
- }
-
- @Override
- public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
- TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
- try {
- ZooKeeper zk = zkClient.get();
- List<String> children = zk.getChildren(serverLoadPath, false);
- for (String server : children) {
- ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
- }
- return ownerships;
- } catch (InterruptedException | IOException | KeeperException e) {
- throw new StateManagerLoadException(e);
+
+ @Override
+ public void saveOwnership(TreeSet<ServerLoad> serverLoads) throws StateManagerSaveException {
+ logger.info("saving ownership");
+ try {
+ ZooKeeper zk = zkClient.get();
+ // use timestamp as data so watchers will see any changes
+ byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+
+ if (zk.exists(serverLoadPath, false) == null) { //create path to rootnode if it does not yet exist
+ createServerLoadPathIfNoExists(timestamp);
+ }
+
+ Transaction tx = zk.transaction();
+ List<String> children = zk.getChildren(serverLoadPath, false);
+ HashSet<String> servers = new HashSet<String>(children);
+ tx.setData(serverLoadPath, timestamp, -1); // trigger the watcher that data has been updated
+ for (ServerLoad serverLoad : serverLoads) {
+ String server = serverToZkFormat(serverLoad.getServer());
+ String serverPath = serverPath(server);
+ if (servers.contains(server)) {
+ servers.remove(server);
+ tx.setData(serverPath, serverLoad.serialize(), -1);
+ } else {
+ tx.create(serverPath, serverLoad.serialize(), zkClient.getDefaultACL(), CreateMode.PERSISTENT);
+ }
+ }
+ for (String server : servers) {
+ tx.delete(serverPath(server), -1);
+ }
+ tx.commit();
+ } catch (InterruptedException | IOException | KeeperException e) {
+ throw new StateManagerSaveException(e);
+ }
}
- }
- @Override
- synchronized public void watch(final PlacementCallback callback) {
- if (watching) {
- return; // do not double watch
+ @Override
+ public TreeSet<ServerLoad> loadOwnership() throws StateManagerLoadException {
+ TreeSet<ServerLoad> ownerships = new TreeSet<ServerLoad>();
+ try {
+ ZooKeeper zk = zkClient.get();
+ List<String> children = zk.getChildren(serverLoadPath, false);
+ for (String server : children) {
+ ownerships.add(ServerLoad.deserialize(zk.getData(serverPath(server), false, new Stat())));
+ }
+ return ownerships;
+ } catch (InterruptedException | IOException | KeeperException e) {
+ throw new StateManagerLoadException(e);
+ }
}
- watching = true;
-
- try {
- ZooKeeper zk = zkClient.get();
- try {
- zk.getData(serverLoadPath, new Watcher() {
- @Override
- public void process(WatchedEvent watchedEvent) {
+
+ @Override
+ public synchronized void watch(final PlacementCallback callback) {
+ if (watching) {
+ return; // do not double watch
+ }
+ watching = true;
+
+ try {
+ ZooKeeper zk = zkClient.get();
try {
- callback.callback(loadOwnership());
- } catch (StateManagerLoadException e) {
- logger.error("Watch of Ownership failed", e);
- } finally {
- watching = false;
- watch(callback);
+ zk.getData(serverLoadPath, new Watcher() {
+ @Override
+ public void process(WatchedEvent watchedEvent) {
+ try {
+ callback.callback(loadOwnership());
+ } catch (StateManagerLoadException e) {
+ logger.error("Watch of Ownership failed", e);
+ } finally {
+ watching = false;
+ watch(callback);
+ }
+ }
+ }, new Stat());
+ } catch (KeeperException.NoNodeException nee) {
+ byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
+ createServerLoadPathIfNoExists(timestamp);
+ watching = false;
+ watch(callback);
}
- }
- }, new Stat());
- } catch (KeeperException.NoNodeException nee) {
- byte[] timestamp = ByteBuffer.allocate(8).putLong(System.currentTimeMillis()).array();
- createServerLoadPathIfNoExists(timestamp);
- watching = false;
- watch(callback);
- }
- } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
- logger.error("Watch of Ownership failed", e);
- watching = false;
- watch(callback);
+ } catch (ZooKeeperClient.ZooKeeperConnectionException | InterruptedException | KeeperException e) {
+ logger.error("Watch of Ownership failed", e);
+ watching = false;
+ watch(callback);
+ }
}
- }
- public String serverPath(String server) {
- return String.format("%s/%s", serverLoadPath, server);
- }
+ public String serverPath(String server) {
+ return String.format("%s/%s", serverLoadPath, server);
+ }
- protected String serverToZkFormat(String server) {
- return server.replaceAll("/", "--");
- }
+ protected String serverToZkFormat(String server) {
+ return server.replaceAll("/", "--");
+ }
- protected String zkFormatToServer(String zkFormattedServer) {
- return zkFormattedServer.replaceAll("--", "/");
- }
+ protected String zkFormatToServer(String zkFormattedServer) {
+ return zkFormattedServer.replaceAll("--", "/");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
new file mode 100644
index 0000000..72c134b
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/placement/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Placement Policy to place streams across proxy services.
+ */
+package com.twitter.distributedlog.service.placement;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index fbef587..b513e24 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -18,20 +18,18 @@
package com.twitter.distributedlog.service.stream;
import com.google.common.base.Stopwatch;
-
-import com.twitter.distributedlog.util.Sequencer;
-import com.twitter.util.Future;
-import com.twitter.util.FutureEventListener;
-import com.twitter.util.Promise;
-import com.twitter.util.Return;
-import com.twitter.util.Try;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.exceptions.ChecksumFailedException;
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
-
+import com.twitter.distributedlog.util.Sequencer;
+import com.twitter.util.Future;
+import com.twitter.util.FutureEventListener;
+import com.twitter.util.Promise;
+import com.twitter.util.Return;
+import com.twitter.util.Try;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.OpStatsLogger;
@@ -40,8 +38,13 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
+/**
+ * Abstract Stream Operation.
+ */
public abstract class AbstractStreamOp<Response> implements StreamOp {
- static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
+ private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
+
protected final String stream;
protected final OpStatsLogger opStatsLogger;
private final Promise<Response> result = new Promise<Response>();
@@ -103,7 +106,7 @@ public abstract class AbstractStreamOp<Response> implements StreamOp {
}
/**
- * Fail with current <i>owner</i> and its reason <i>t</i>
+ * Fail with current <i>owner</i> and its reason <i>t</i>.
*
* @param cause
* failure reason
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
index ae0d67d..a385b84 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractWriteOp.java
@@ -17,17 +17,18 @@
*/
package com.twitter.distributedlog.service.stream;
-import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.service.ResponseUtils;
-import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.util.Future;
-
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.OpStatsLogger;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Abstract Write Operation.
+ */
public abstract class AbstractWriteOp extends AbstractStreamOp<WriteResponse> {
protected AbstractWriteOp(String stream,
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
index c009bb9..4d50b66 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/BulkWriteOp.java
@@ -17,19 +17,13 @@
*/
package com.twitter.distributedlog.service.stream;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.DLSN;
-import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.LogRecord;
import com.twitter.distributedlog.acl.AccessControlManager;
+import com.twitter.distributedlog.exceptions.AlreadyClosedException;
import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
@@ -39,21 +33,26 @@ import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.ConstFuture;
+import com.twitter.util.Future;
import com.twitter.util.Future$;
import com.twitter.util.FutureEventListener;
-import com.twitter.util.Future;
import com.twitter.util.Try;
-
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Bulk Write Operation.
+ */
public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements WriteOpWithPayload {
private final List<ByteBuffer> buffers;
private final long payloadSize;
@@ -77,9 +76,9 @@ public class BulkWriteOp extends AbstractStreamOp<BulkWriteResponse> implements
try {
result.get();
} catch (Exception ex) {
- if (ex instanceof OwnershipAcquireFailedException ||
- ex instanceof AlreadyClosedException ||
- ex instanceof LockingException) {
+ if (ex instanceof OwnershipAcquireFailedException
+ || ex instanceof AlreadyClosedException
+ || ex instanceof LockingException) {
def = true;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
index 1dde1f9..e30a989 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/DeleteOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Operation to delete a log stream.
+ */
public class DeleteOp extends AbstractWriteOp {
private final StreamManager streamManager;
private final Counter deniedDeleteCounter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
index 4b2cbc1..f34295b 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/HeartbeatOp.java
@@ -17,6 +17,8 @@
*/
package com.twitter.distributedlog.service.stream;
+import static com.google.common.base.Charsets.UTF_8;
+
import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.BKAsyncLogWriter;
import com.twitter.distributedlog.DLSN;
@@ -28,15 +30,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
-
import scala.runtime.AbstractFunction1;
-import static com.google.common.base.Charsets.UTF_8;
-
+/**
+ * Heartbeat Operation.
+ */
public class HeartbeatOp extends AbstractWriteOp {
static final byte[] HEARTBEAT_DATA = "heartbeat".getBytes(UTF_8);
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
index 25835f6..aa0f1a7 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/ReleaseOp.java
@@ -25,13 +25,14 @@ import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
-
-import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.StatsLogger;
-
import scala.runtime.AbstractFunction1;
+/**
+ * Operation to release ownership of a log stream.
+ */
public class ReleaseOp extends AbstractWriteOp {
private final StreamManager streamManager;
private final Counter deniedReleaseCounter;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
index a1e3e4f..e015e29 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/Stream.java
@@ -23,13 +23,14 @@ import com.twitter.util.Future;
import java.io.IOException;
/**
- * Stream is the per stream request handler in the DL service layer. The collection of Streams in
- * the proxy are managed by StreamManager.
+ * Stream is the per stream request handler in the DL service layer.
+ *
+ * <p>The collection of Streams in the proxy are managed by StreamManager.
*/
public interface Stream {
/**
- * Get the stream configuration for this stream
+ * Get the stream configuration for this stream.
*
* @return stream configuration
*/
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
index 51d7ffd..0dfbd69 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactory.java
@@ -19,6 +19,9 @@ package com.twitter.distributedlog.service.stream;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+/**
+ * Factory to create a stream with provided stream configuration {@code streamConf}.
+ */
public interface StreamFactory {
/**
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/1a30b0ce/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
----------------------------------------------------------------------
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
index cb28f1e..566ded6 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/StreamFactoryImpl.java
@@ -29,6 +29,9 @@ import com.twitter.util.Timer;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.jboss.netty.util.HashedWheelTimer;
+/**
+ * The implementation of {@link StreamFactory}.
+ */
public class StreamFactoryImpl implements StreamFactory {
private final String clientId;
private final StreamOpStats streamOpStats;