You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:27 UTC
[20/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
new file mode 100644
index 0000000..86d1c11
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.ownership;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.distributedlog.client.ClientConfig;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Map;
+import java.util.Set;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Case for Ownership Cache.
+ */
+public class TestOwnershipCache {
+
+ @Rule
+ public TestName runtime = new TestName();
+
+ private static OwnershipCache createOwnershipCache() {
+ ClientConfig clientConfig = new ClientConfig();
+ return new OwnershipCache(clientConfig, null,
+ NullStatsReceiver.get(), NullStatsReceiver.get());
+ }
+
+ private static SocketAddress createSocketAddress(int port) {
+ return new InetSocketAddress("127.0.0.1", port);
+ }
+
+ @Test(timeout = 60000)
+ public void testUpdateOwner() {
+ OwnershipCache cache = createOwnershipCache();
+ SocketAddress addr = createSocketAddress(1000);
+ String stream = runtime.getMethodName();
+
+ assertTrue("Should successfully update owner if no owner exists before",
+ cache.updateOwner(stream, addr));
+ assertEquals("Owner should be " + addr + " for stream " + stream,
+ addr, cache.getOwner(stream));
+ assertTrue("Should successfully update owner if old owner is same",
+ cache.updateOwner(stream, addr));
+ assertEquals("Owner should be " + addr + " for stream " + stream,
+ addr, cache.getOwner(stream));
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveOwnerFromStream() {
+ OwnershipCache cache = createOwnershipCache();
+ int initialPort = 2000;
+ int numProxies = 2;
+ int numStreamsPerProxy = 2;
+ for (int i = 0; i < numProxies; i++) {
+ SocketAddress addr = createSocketAddress(initialPort + i);
+ for (int j = 0; j < numStreamsPerProxy; j++) {
+ String stream = runtime.getMethodName() + "_" + i + "_" + j;
+ cache.updateOwner(stream, addr);
+ }
+ }
+ Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+ Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should be " + numProxies + " proxies cached",
+ numProxies, ownershipDistribution.size());
+
+ String stream = runtime.getMethodName() + "_0_0";
+ SocketAddress owner = createSocketAddress(initialPort);
+
+ // remove non-existent mapping won't change anything
+ SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+ cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr");
+ assertEquals("Owner " + owner + " should not be removed",
+ owner, cache.getOwner(stream));
+ ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+
+ // remove existent mapping should remove ownership mapping
+ cache.removeOwnerFromStream(stream, owner, "remove-owner");
+ assertNull("Owner " + owner + " should be removed", cache.getOwner(stream));
+ ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache",
+ numProxies * numStreamsPerProxy - 1, ownershipMap.size());
+ ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should still be " + numProxies + " proxies cached",
+ numProxies, ownershipDistribution.size());
+ Set<String> ownedStreams = ownershipDistribution.get(owner);
+ assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner,
+ numStreamsPerProxy - 1, ownedStreams.size());
+ assertFalse("Stream " + stream + " should not be owned by " + owner,
+ ownedStreams.contains(stream));
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveAllStreamsFromOwner() {
+ OwnershipCache cache = createOwnershipCache();
+ int initialPort = 2000;
+ int numProxies = 2;
+ int numStreamsPerProxy = 2;
+ for (int i = 0; i < numProxies; i++) {
+ SocketAddress addr = createSocketAddress(initialPort + i);
+ for (int j = 0; j < numStreamsPerProxy; j++) {
+ String stream = runtime.getMethodName() + "_" + i + "_" + j;
+ cache.updateOwner(stream, addr);
+ }
+ }
+ Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+ Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should be " + numProxies + " proxies cached",
+ numProxies, ownershipDistribution.size());
+
+ SocketAddress owner = createSocketAddress(initialPort);
+
+ // remove non-existent host won't change anything
+ SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
+ cache.removeAllStreamsFromOwner(nonExistentAddr);
+ ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+ ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should still be " + numProxies + " proxies cached",
+ numProxies, ownershipDistribution.size());
+
+ // remove existent host should remove ownership mapping
+ cache.removeAllStreamsFromOwner(owner);
+ ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache",
+ (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
+ ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should be " + (numProxies - 1) + " proxies cached",
+ numProxies - 1, ownershipDistribution.size());
+ assertFalse("Host " + owner + " should not be cached",
+ ownershipDistribution.containsKey(owner));
+ }
+
+ @Test(timeout = 60000)
+ public void testReplaceOwner() {
+ OwnershipCache cache = createOwnershipCache();
+ int initialPort = 2000;
+ int numProxies = 2;
+ int numStreamsPerProxy = 2;
+ for (int i = 0; i < numProxies; i++) {
+ SocketAddress addr = createSocketAddress(initialPort + i);
+ for (int j = 0; j < numStreamsPerProxy; j++) {
+ String stream = runtime.getMethodName() + "_" + i + "_" + j;
+ cache.updateOwner(stream, addr);
+ }
+ }
+ Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+ Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should be " + numProxies + " proxies cached",
+ numProxies, ownershipDistribution.size());
+
+ String stream = runtime.getMethodName() + "_0_0";
+ SocketAddress oldOwner = createSocketAddress(initialPort);
+ SocketAddress newOwner = createSocketAddress(initialPort + 999);
+
+ cache.updateOwner(stream, newOwner);
+ assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner,
+ newOwner, cache.getOwner(stream));
+ ownershipMap = cache.getStreamOwnerMapping();
+ assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
+ numProxies * numStreamsPerProxy, ownershipMap.size());
+ assertEquals("Owner of " + stream + " should be " + newOwner,
+ newOwner, ownershipMap.get(stream));
+ ownershipDistribution = cache.getStreamOwnershipDistribution();
+ assertEquals("There should be " + (numProxies + 1) + " proxies cached",
+ numProxies + 1, ownershipDistribution.size());
+ Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
+ assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner,
+ numStreamsPerProxy - 1, oldOwnedStreams.size());
+ assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
+ oldOwnedStreams.contains(stream));
+ Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
+ assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner,
+ 1, newOwnedStreams.size());
+ assertTrue("Stream " + stream + " should be owned by " + newOwner,
+ newOwnedStreams.contains(stream));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
new file mode 100644
index 0000000..8ef33bd
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.BulkWriteResponse;
+import org.apache.distributedlog.thrift.service.ClientInfo;
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import org.apache.distributedlog.thrift.service.HeartbeatOptions;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import org.apache.distributedlog.thrift.service.WriteContext;
+import org.apache.distributedlog.thrift.service.WriteResponse;
+import com.twitter.util.Future;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+/**
+ * Mock DistributedLog Related Services.
+ */
+public class MockDistributedLogServices {
+
+ /**
+ * Mock basic service.
+ */
+ static class MockBasicService implements DistributedLogService.ServiceIface {
+
+ @Override
+ public Future<ServerInfo> handshake() {
+ return Future.value(new ServerInfo());
+ }
+
+ @Override
+ public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+ return Future.value(new ServerInfo());
+ }
+
+ @Override
+ public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> heartbeatWithOptions(String stream,
+ WriteContext ctx,
+ HeartbeatOptions options) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> write(String stream,
+ ByteBuffer data) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> writeWithContext(String stream,
+ ByteBuffer data,
+ WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<BulkWriteResponse> writeBulkWithContext(String stream,
+ List<ByteBuffer> data,
+ WriteContext ctx) {
+ return Future.value(new BulkWriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> truncate(String stream,
+ String dlsn,
+ WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> release(String stream,
+ WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> create(String stream, WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> delete(String stream,
+ WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
+ return Future.value(new WriteResponse());
+ }
+
+ @Override
+ public Future<Void> setAcceptNewStream(boolean enabled) {
+ return Future.value(null);
+ }
+ }
+
+ /**
+ * Mock server info service.
+ */
+ public static class MockServerInfoService extends MockBasicService {
+
+ protected ServerInfo serverInfo;
+
+ public MockServerInfoService() {
+ serverInfo = new ServerInfo();
+ }
+
+ public void updateServerInfo(ServerInfo serverInfo) {
+ this.serverInfo = serverInfo;
+ }
+
+ @Override
+ public Future<ServerInfo> handshake() {
+ return Future.value(serverInfo);
+ }
+
+ @Override
+ public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
+ return Future.value(serverInfo);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
new file mode 100644
index 0000000..e38c2ed
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import org.apache.distributedlog.thrift.service.DistributedLogService;
+import java.net.SocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * Mock Proxy Client Builder.
+ */
+class MockProxyClientBuilder implements ProxyClient.Builder {
+
+ static class MockProxyClient extends ProxyClient {
+ MockProxyClient(SocketAddress address,
+ DistributedLogService.ServiceIface service) {
+ super(address, new MockThriftClient(), service);
+ }
+ }
+
+ private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
+ new ConcurrentHashMap<SocketAddress, MockProxyClient>();
+
+ public void provideProxyClient(SocketAddress address,
+ MockProxyClient proxyClient) {
+ clients.put(address, proxyClient);
+ }
+
+ @Override
+ public ProxyClient build(SocketAddress address) {
+ return clients.get(address);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
new file mode 100644
index 0000000..ad1c878
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import com.twitter.finagle.Service;
+import com.twitter.finagle.thrift.ThriftClientRequest;
+import com.twitter.util.Future;
+
+/**
+ * Mock Thrift Client.
+ */
+class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
+ @Override
+ public Future<byte[]> apply(ThriftClientRequest request) {
+ return Future.value(request.message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
new file mode 100644
index 0000000..6d9a471
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
@@ -0,0 +1,368 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.proxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.distributedlog.client.ClientConfig;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
+import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
+import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.client.stats.ClientStats;
+import org.apache.distributedlog.thrift.service.ServerInfo;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.lang3.tuple.Pair;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+/**
+ * Test Proxy Client Manager.
+ */
+public class TestProxyClientManager {
+
+ @Rule
+ public TestName runtime = new TestName();
+
+ static class TestHostProvider implements HostProvider {
+
+ Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+
+ synchronized void addHost(SocketAddress host) {
+ hosts.add(host);
+ }
+
+ @Override
+ public synchronized Set<SocketAddress> getHosts() {
+ return ImmutableSet.copyOf(hosts);
+ }
+
+ }
+
+ private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+ long periodicHandshakeIntervalMs) {
+ HostProvider provider = new TestHostProvider();
+ return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
+ }
+
+ private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
+ HostProvider hostProvider,
+ long periodicHandshakeIntervalMs) {
+ ClientConfig clientConfig = new ClientConfig();
+ clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
+ clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
+ HashedWheelTimer dlTimer = new HashedWheelTimer(
+ new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
+ clientConfig.getRedirectBackoffStartMs(),
+ TimeUnit.MILLISECONDS);
+ return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
+ new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
+ }
+
+ private static SocketAddress createSocketAddress(int port) {
+ return new InetSocketAddress("127.0.0.1", port);
+ }
+
+ private static MockProxyClient createMockProxyClient(SocketAddress address) {
+ return new MockProxyClient(address, new MockBasicService());
+ }
+
+ private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
+ SocketAddress address, ServerInfo serverInfo) {
+ MockServerInfoService service = new MockServerInfoService();
+ MockProxyClient proxyClient = new MockProxyClient(address, service);
+ service.updateServerInfo(serverInfo);
+ return Pair.of(proxyClient, service);
+ }
+
+ @Test(timeout = 60000)
+ public void testBasicCreateRemove() throws Exception {
+ SocketAddress address = createSocketAddress(1000);
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ MockProxyClient mockProxyClient = createMockProxyClient(address);
+ builder.provideProxyClient(address, mockProxyClient);
+
+ ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ ProxyClient proxyClient = clientManager.createClient(address);
+ assertEquals("Create client should build the proxy client",
+ 1, clientManager.getNumProxies());
+ assertTrue("The client returned should be the same client that builder built",
+ mockProxyClient == proxyClient);
+ }
+
+ @Test(timeout = 60000)
+ public void testGetShouldCreateClient() throws Exception {
+ SocketAddress address = createSocketAddress(2000);
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ MockProxyClient mockProxyClient = createMockProxyClient(address);
+ builder.provideProxyClient(address, mockProxyClient);
+
+ ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ ProxyClient proxyClient = clientManager.getClient(address);
+ assertEquals("Get client should build the proxy client",
+ 1, clientManager.getNumProxies());
+ assertTrue("The client returned should be the same client that builder built",
+ mockProxyClient == proxyClient);
+ }
+
+ @Test(timeout = 60000)
+ public void testConditionalRemoveClient() throws Exception {
+ SocketAddress address = createSocketAddress(3000);
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ MockProxyClient mockProxyClient = createMockProxyClient(address);
+ MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
+ builder.provideProxyClient(address, mockProxyClient);
+
+ ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ clientManager.createClient(address);
+ assertEquals("Create client should build the proxy client",
+ 1, clientManager.getNumProxies());
+ clientManager.removeClient(address, anotherMockProxyClient);
+ assertEquals("Conditional remove should not remove proxy client",
+ 1, clientManager.getNumProxies());
+ clientManager.removeClient(address, mockProxyClient);
+ assertEquals("Conditional remove should remove proxy client",
+ 0, clientManager.getNumProxies());
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveClient() throws Exception {
+ SocketAddress address = createSocketAddress(3000);
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ MockProxyClient mockProxyClient = createMockProxyClient(address);
+ builder.provideProxyClient(address, mockProxyClient);
+
+ ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ clientManager.createClient(address);
+ assertEquals("Create client should build the proxy client",
+ 1, clientManager.getNumProxies());
+ clientManager.removeClient(address);
+ assertEquals("Remove should remove proxy client",
+ 0, clientManager.getNumProxies());
+ }
+
+ @Test(timeout = 60000)
+ public void testCreateClientShouldHandshake() throws Exception {
+ SocketAddress address = createSocketAddress(3000);
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ ServerInfo serverInfo = new ServerInfo();
+ serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
+ runtime.getMethodName() + "_owner");
+ Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+ createMockProxyClient(address, serverInfo);
+ builder.provideProxyClient(address, mockProxyClient.getLeft());
+
+ final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
+ final CountDownLatch doneLatch = new CountDownLatch(1);
+ ProxyListener listener = new ProxyListener() {
+ @Override
+ public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+ resultHolder.set(serverInfo);
+ doneLatch.countDown();
+ }
+ @Override
+ public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+ }
+ };
+
+ ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
+ clientManager.registerProxyListener(listener);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ clientManager.createClient(address);
+ assertEquals("Create client should build the proxy client",
+ 1, clientManager.getNumProxies());
+
+ // When a client is created, it would handshake with that proxy
+ doneLatch.await();
+ assertEquals("Handshake should return server info",
+ serverInfo, resultHolder.get());
+ }
+
+ @Test(timeout = 60000)
+ public void testHandshake() throws Exception {
+ final int numHosts = 3;
+ final int numStreamsPerHost = 3;
+ final int initialPort = 4000;
+
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ Map<SocketAddress, ServerInfo> serverInfoMap =
+ new HashMap<SocketAddress, ServerInfo>();
+ for (int i = 0; i < numHosts; i++) {
+ SocketAddress address = createSocketAddress(initialPort + i);
+ ServerInfo serverInfo = new ServerInfo();
+ for (int j = 0; j < numStreamsPerHost; j++) {
+ serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+ address.toString());
+ }
+ Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+ createMockProxyClient(address, serverInfo);
+ builder.provideProxyClient(address, mockProxyClient.getLeft());
+ serverInfoMap.put(address, serverInfo);
+ }
+
+ final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+ final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
+ ProxyListener listener = new ProxyListener() {
+ @Override
+ public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+ synchronized (results) {
+ results.put(address, serverInfo);
+ }
+ doneLatch.countDown();
+ }
+
+ @Override
+ public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+ }
+ };
+
+ TestHostProvider rs = new TestHostProvider();
+ ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
+ clientManager.registerProxyListener(listener);
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ for (int i = 0; i < numHosts; i++) {
+ rs.addHost(createSocketAddress(initialPort + i));
+ }
+ // handshake would handshake with 3 hosts again
+ clientManager.handshake();
+ doneLatch.await();
+ assertEquals("Handshake should return server info",
+ numHosts, results.size());
+ assertTrue("Handshake should get all server infos",
+ Maps.difference(serverInfoMap, results).areEqual());
+ }
+
+ @Test(timeout = 60000)
+ public void testPeriodicHandshake() throws Exception {
+ final int numHosts = 3;
+ final int numStreamsPerHost = 3;
+ final int initialPort = 5000;
+
+ MockProxyClientBuilder builder = new MockProxyClientBuilder();
+ Map<SocketAddress, ServerInfo> serverInfoMap =
+ new HashMap<SocketAddress, ServerInfo>();
+ Map<SocketAddress, MockServerInfoService> mockServiceMap =
+ new HashMap<SocketAddress, MockServerInfoService>();
+ final Map<SocketAddress, CountDownLatch> hostDoneLatches =
+ new HashMap<SocketAddress, CountDownLatch>();
+ for (int i = 0; i < numHosts; i++) {
+ SocketAddress address = createSocketAddress(initialPort + i);
+ ServerInfo serverInfo = new ServerInfo();
+ for (int j = 0; j < numStreamsPerHost; j++) {
+ serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
+ address.toString());
+ }
+ Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
+ createMockProxyClient(address, serverInfo);
+ builder.provideProxyClient(address, mockProxyClient.getLeft());
+ serverInfoMap.put(address, serverInfo);
+ mockServiceMap.put(address, mockProxyClient.getRight());
+ hostDoneLatches.put(address, new CountDownLatch(2));
+ }
+
+ final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
+ final CountDownLatch doneLatch = new CountDownLatch(numHosts);
+ ProxyListener listener = new ProxyListener() {
+ @Override
+ public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
+ synchronized (results) {
+ results.put(address, serverInfo);
+ CountDownLatch latch = hostDoneLatches.get(address);
+ if (null != latch) {
+ latch.countDown();
+ }
+ }
+ doneLatch.countDown();
+ }
+
+ @Override
+ public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
+ }
+ };
+
+ TestHostProvider rs = new TestHostProvider();
+ ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
+ clientManager.setPeriodicHandshakeEnabled(false);
+ clientManager.registerProxyListener(listener);
+
+ assertEquals("There should be no clients in the manager",
+ 0, clientManager.getNumProxies());
+ for (int i = 0; i < numHosts; i++) {
+ SocketAddress address = createSocketAddress(initialPort + i);
+ rs.addHost(address);
+ clientManager.createClient(address);
+ }
+
+ // make sure the first 3 handshakes going through
+ doneLatch.await();
+
+ assertEquals("Handshake should return server info",
+ numHosts, results.size());
+ assertTrue("Handshake should get all server infos",
+ Maps.difference(serverInfoMap, results).areEqual());
+
+ // update server info
+ for (int i = 0; i < numHosts; i++) {
+ SocketAddress address = createSocketAddress(initialPort + i);
+ ServerInfo serverInfo = new ServerInfo();
+ for (int j = 0; j < numStreamsPerHost; j++) {
+ serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
+ address.toString());
+ }
+ MockServerInfoService service = mockServiceMap.get(address);
+ serverInfoMap.put(address, serverInfo);
+ service.updateServerInfo(serverInfo);
+ }
+
+ clientManager.setPeriodicHandshakeEnabled(true);
+ for (int i = 0; i < numHosts; i++) {
+ SocketAddress address = createSocketAddress(initialPort + i);
+ CountDownLatch latch = hostDoneLatches.get(address);
+ latch.await();
+ }
+
+ assertTrue("Periodic handshake should update all server infos",
+ Maps.difference(serverInfoMap, results).areEqual());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
new file mode 100644
index 0000000..f44cddd
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestConsistentHashRoutingService.java
@@ -0,0 +1,417 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.service.DLSocketAddress;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.ChannelWriteException;
+import com.twitter.finagle.NoBrokersAvailableException;
+import com.twitter.finagle.stats.NullStatsReceiver;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link ConsistentHashRoutingService}.
+ */
+public class TestConsistentHashRoutingService {
+
+ @Test(timeout = 60000)
+ public void testBlackoutHost() throws Exception {
+ TestName name = new TestName();
+ RoutingService routingService = ConsistentHashRoutingService.newBuilder()
+ .serverSet(new NameServerSet(name))
+ .resolveFromName(true)
+ .numReplicas(997)
+ .blackoutSeconds(2)
+ .build();
+
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", 3181);
+ Address address = Addresses.newInetAddress(inetAddress);
+ List<Address> addresses = new ArrayList<Address>(1);
+ addresses.add(address);
+ name.changeAddrs(addresses);
+
+ routingService.startService();
+
+ RoutingService.RoutingContext routingContext =
+ RoutingService.RoutingContext.of(new DefaultRegionResolver());
+
+ String streamName = "test-blackout-host";
+ assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+ routingService.removeHost(inetAddress, new ChannelWriteException(new IOException("test exception")));
+ try {
+ routingService.getHost(streamName, routingContext);
+ fail("Should fail to get host since no brokers are available");
+ } catch (NoBrokersAvailableException nbae) {
+ // expected
+ }
+
+ TimeUnit.SECONDS.sleep(3);
+ assertEquals(inetAddress, routingService.getHost(streamName, routingContext));
+
+ routingService.stopService();
+ }
+
+ @Test(timeout = 60000)
+ public void testPerformServerSetChangeOnName() throws Exception {
+ TestName name = new TestName();
+ ConsistentHashRoutingService routingService = (ConsistentHashRoutingService)
+ ConsistentHashRoutingService.newBuilder()
+ .serverSet(new NameServerSet(name))
+ .resolveFromName(true)
+ .numReplicas(997)
+ .build();
+
+ int basePort = 3180;
+ int numHosts = 4;
+ List<Address> addresses1 = Lists.newArrayListWithExpectedSize(4);
+ List<Address> addresses2 = Lists.newArrayListWithExpectedSize(4);
+ List<Address> addresses3 = Lists.newArrayListWithExpectedSize(4);
+
+ // fill up the addresses1
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ Address address = Addresses.newInetAddress(inetAddress);
+ addresses1.add(address);
+ }
+ // fill up the addresses2 - overlap with addresses1
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+ Address address = Addresses.newInetAddress(inetAddress);
+ addresses2.add(address);
+ }
+ // fill up the addresses3 - not overlap with addresses2
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+ Address address = Addresses.newInetAddress(inetAddress);
+ addresses3.add(address);
+ }
+
+ final List<SocketAddress> leftAddresses = Lists.newArrayList();
+ final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+ RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+ @Override
+ public void onServerLeft(SocketAddress address) {
+ synchronized (leftAddresses) {
+ leftAddresses.add(address);
+ leftAddresses.notifyAll();
+ }
+ }
+
+ @Override
+ public void onServerJoin(SocketAddress address) {
+ synchronized (joinAddresses) {
+ joinAddresses.add(address);
+ joinAddresses.notifyAll();
+ }
+ }
+ };
+
+ routingService.registerListener(routingListener);
+ name.changeAddrs(addresses1);
+
+ routingService.startService();
+
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts) {
+ joinAddresses.wait();
+ }
+ }
+
+ // validate 4 nodes joined
+ synchronized (joinAddresses) {
+ assertEquals(numHosts, joinAddresses.size());
+ }
+ synchronized (leftAddresses) {
+ assertEquals(0, leftAddresses.size());
+ }
+ assertEquals(numHosts, routingService.shardId2Address.size());
+ assertEquals(numHosts, routingService.address2ShardId.size());
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ // update addresses2 - 2 new hosts joined, 2 old hosts left
+ name.changeAddrs(addresses2);
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts + 2) {
+ joinAddresses.wait();
+ }
+ }
+ synchronized (leftAddresses) {
+ while (leftAddresses.size() < numHosts - 2) {
+ leftAddresses.wait();
+ }
+ }
+ assertEquals(numHosts, routingService.shardId2Address.size());
+ assertEquals(numHosts, routingService.address2ShardId.size());
+
+ // first 2 shards should leave
+ for (int i = 0; i < 2; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+ }
+
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 2 + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ // update addresses3 - 2 new hosts joined, 2 old hosts left
+ name.changeAddrs(addresses3);
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts + 2 + numHosts) {
+ joinAddresses.wait();
+ }
+ }
+ synchronized (leftAddresses) {
+ while (leftAddresses.size() < numHosts - 2 + numHosts) {
+ leftAddresses.wait();
+ }
+ }
+ assertEquals(numHosts, routingService.shardId2Address.size());
+ assertEquals(numHosts, routingService.address2ShardId.size());
+
+ // first 6 shards should leave
+ for (int i = 0; i < 2 + numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ assertFalse(routingService.address2ShardId.containsKey(inetAddress));
+ }
+ // new 4 shards should exist
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ }
+
+ private static class TestServerSetWatcher implements ServerSetWatcher {
+
+ final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue =
+ new LinkedBlockingQueue<ImmutableSet<DLSocketAddress>>();
+ final CopyOnWriteArrayList<ServerSetMonitor> monitors =
+ new CopyOnWriteArrayList<ServerSetMonitor>();
+
+ @Override
+ public void watch(ServerSetMonitor monitor) throws MonitorException {
+ monitors.add(monitor);
+ ImmutableSet<DLSocketAddress> change;
+ while ((change = changeQueue.poll()) != null) {
+ notifyChanges(change);
+ }
+ }
+
+ void notifyChanges(ImmutableSet<DLSocketAddress> addresses) {
+ if (monitors.isEmpty()) {
+ changeQueue.add(addresses);
+ } else {
+ for (ServerSetMonitor monitor : monitors) {
+ monitor.onChange(addresses);
+ }
+ }
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testPerformServerSetChangeOnServerSet() throws Exception {
+ TestServerSetWatcher serverSetWatcher = new TestServerSetWatcher();
+ ConsistentHashRoutingService routingService = new ConsistentHashRoutingService(
+ serverSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get());
+
+ int basePort = 3180;
+ int numHosts = 4;
+ Set<DLSocketAddress> addresses1 = Sets.newConcurrentHashSet();
+ Set<DLSocketAddress> addresses2 = Sets.newConcurrentHashSet();
+ Set<DLSocketAddress> addresses3 = Sets.newConcurrentHashSet();
+
+ // fill up the addresses1
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+ addresses1.add(dsa);
+ }
+ // fill up the addresses2 - overlap with addresses1
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+ DLSocketAddress dsa = new DLSocketAddress(i + 2, inetAddress);
+ addresses2.add(dsa);
+ }
+ // fill up the addresses3 - not overlap with addresses2
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+ DLSocketAddress dsa = new DLSocketAddress(i, inetAddress);
+ addresses3.add(dsa);
+ }
+
+ final List<SocketAddress> leftAddresses = Lists.newArrayList();
+ final List<SocketAddress> joinAddresses = Lists.newArrayList();
+
+ RoutingService.RoutingListener routingListener = new RoutingService.RoutingListener() {
+ @Override
+ public void onServerLeft(SocketAddress address) {
+ synchronized (leftAddresses) {
+ leftAddresses.add(address);
+ leftAddresses.notifyAll();
+ }
+ }
+
+ @Override
+ public void onServerJoin(SocketAddress address) {
+ synchronized (joinAddresses) {
+ joinAddresses.add(address);
+ joinAddresses.notifyAll();
+ }
+ }
+ };
+
+ routingService.registerListener(routingListener);
+ serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses1));
+
+ routingService.startService();
+
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts) {
+ joinAddresses.wait();
+ }
+ }
+
+ // validate 4 nodes joined
+ synchronized (joinAddresses) {
+ assertEquals(numHosts, joinAddresses.size());
+ }
+ synchronized (leftAddresses) {
+ assertEquals(0, leftAddresses.size());
+ }
+ assertEquals(numHosts, routingService.shardId2Address.size());
+ assertEquals(numHosts, routingService.address2ShardId.size());
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ assertEquals(i, shardId);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ // update addresses2 - 2 new hosts joined, 2 old hosts left
+ serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses2));
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts + 2) {
+ joinAddresses.wait();
+ }
+ }
+ synchronized (leftAddresses) {
+ while (leftAddresses.size() < 2) {
+ leftAddresses.wait();
+ }
+ }
+
+ assertEquals(numHosts + 2, routingService.shardId2Address.size());
+ assertEquals(numHosts + 2, routingService.address2ShardId.size());
+ // first 2 shards should not leave
+ for (int i = 0; i < 2; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ assertEquals(i, shardId);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ assertEquals(i + 2, shardId);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ // update addresses3
+ serverSetWatcher.notifyChanges(ImmutableSet.copyOf(addresses3));
+ synchronized (joinAddresses) {
+ while (joinAddresses.size() < numHosts + 2 + numHosts) {
+ joinAddresses.wait();
+ }
+ }
+ synchronized (leftAddresses) {
+ while (leftAddresses.size() < 2 + numHosts) {
+ leftAddresses.wait();
+ }
+ }
+ assertEquals(numHosts + 2, routingService.shardId2Address.size());
+ assertEquals(numHosts + 2, routingService.address2ShardId.size());
+
+ // first 4 shards should leave
+ for (int i = 0; i < numHosts; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + 10 + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ assertEquals(i, shardId);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+ // the other 2 shards should be still there
+ for (int i = 0; i < 2; i++) {
+ InetSocketAddress inetAddress = new InetSocketAddress("127.0.0.1", basePort + numHosts + 2 + i);
+ assertTrue(routingService.address2ShardId.containsKey(inetAddress));
+ int shardId = routingService.address2ShardId.get(inetAddress);
+ assertEquals(numHosts + i, shardId);
+ SocketAddress sa = routingService.shardId2Address.get(shardId);
+ assertNotNull(sa);
+ assertEquals(inetAddress, sa);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
new file mode 100644
index 0000000..59665b9
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestInetNameResolution.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import com.google.common.collect.ImmutableSet;
+import com.twitter.common.net.pool.DynamicHostSet;
+import com.twitter.thrift.Endpoint;
+import com.twitter.thrift.ServiceInstance;
+import java.net.InetSocketAddress;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for `inet` name resolution.
+ */
+public class TestInetNameResolution {
+
+ private static final Logger logger = LoggerFactory.getLogger(TestRoutingService.class);
+
+ @Test(timeout = 10000)
+ public void testInetNameResolution() throws Exception {
+ String nameStr = "inet!127.0.0.1:3181";
+ final CountDownLatch resolved = new CountDownLatch(1);
+ final AtomicBoolean validationFailed = new AtomicBoolean(false);
+
+ NameServerSet serverSet = new NameServerSet(nameStr);
+ serverSet.watch(new DynamicHostSet.HostChangeMonitor<ServiceInstance>() {
+ @Override
+ public void onChange(ImmutableSet<ServiceInstance> hostSet) {
+ if (hostSet.size() > 1) {
+ logger.error("HostSet has more elements than expected {}", hostSet);
+ validationFailed.set(true);
+ resolved.countDown();
+ } else if (hostSet.size() == 1) {
+ ServiceInstance serviceInstance = hostSet.iterator().next();
+ Endpoint endpoint = serviceInstance.getAdditionalEndpoints().get("thrift");
+ InetSocketAddress address = new InetSocketAddress(endpoint.getHost(), endpoint.getPort());
+ if (endpoint.getPort() != 3181) {
+ logger.error("Port does not match the expected port {}", endpoint.getPort());
+ validationFailed.set(true);
+ } else if (!address.getAddress().getHostAddress().equals("127.0.0.1")) {
+ logger.error("Host address does not match the expected address {}",
+ address.getAddress().getHostAddress());
+ validationFailed.set(true);
+ }
+ resolved.countDown();
+ }
+ }
+ });
+
+ resolved.await();
+ Assert.assertEquals(false, validationFailed.get());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
new file mode 100644
index 0000000..151663e
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRegionsRoutingService.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.collect.Sets;
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import org.apache.distributedlog.thrift.service.StatusCode;
+import com.twitter.finagle.NoBrokersAvailableException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+
+/**
+ * Test Case for {@link RegionsRoutingService}.
+ */
+public class TestRegionsRoutingService {
+
+ @Test(timeout = 60000)
+ public void testRoutingListener() throws Exception {
+ int numRoutingServices = 5;
+ RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+ Set<SocketAddress> hosts = new HashSet<SocketAddress>();
+ Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+ for (int i = 0; i < numRoutingServices; i++) {
+ String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+ routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+ SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+ hosts.add(address);
+ regionMap.put(address, "region-" + i);
+ }
+
+ final CountDownLatch doneLatch = new CountDownLatch(numRoutingServices);
+ final AtomicInteger numHostsLeft = new AtomicInteger(0);
+ final Set<SocketAddress> jointHosts = new HashSet<SocketAddress>();
+ RegionsRoutingService regionsRoutingService =
+ RegionsRoutingService.newBuilder()
+ .routingServiceBuilders(routingServiceBuilders)
+ .resolver(new DefaultRegionResolver(regionMap))
+ .build();
+ regionsRoutingService.registerListener(new RoutingService.RoutingListener() {
+ @Override
+ public void onServerLeft(SocketAddress address) {
+ numHostsLeft.incrementAndGet();
+ }
+
+ @Override
+ public void onServerJoin(SocketAddress address) {
+ jointHosts.add(address);
+ doneLatch.countDown();
+ }
+ });
+
+ regionsRoutingService.startService();
+
+ doneLatch.await();
+
+ assertEquals(numRoutingServices, jointHosts.size());
+ assertEquals(0, numHostsLeft.get());
+ assertTrue(Sets.difference(hosts, jointHosts).immutableCopy().isEmpty());
+ }
+
+ @Test(timeout = 60000)
+ public void testGetHost() throws Exception {
+ int numRoutingServices = 3;
+ RoutingService.Builder[] routingServiceBuilders = new RoutingService.Builder[numRoutingServices];
+ Map<SocketAddress, String> regionMap = new HashMap<SocketAddress, String>();
+ for (int i = 0; i < numRoutingServices; i++) {
+ String finagleNameStr = "inet!127.0.0.1:" + (3181 + i);
+ routingServiceBuilders[i] = RoutingUtils.buildRoutingService(finagleNameStr);
+ SocketAddress address = new InetSocketAddress("127.0.0.1", 3181 + i);
+ regionMap.put(address, "region-" + i);
+ }
+
+ RegionsRoutingService regionsRoutingService =
+ RegionsRoutingService.newBuilder()
+ .resolver(new DefaultRegionResolver(regionMap))
+ .routingServiceBuilders(routingServiceBuilders)
+ .build();
+ regionsRoutingService.startService();
+
+ RoutingService.RoutingContext routingContext =
+ RoutingService.RoutingContext.of(new DefaultRegionResolver())
+ .addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+ assertEquals(new InetSocketAddress("127.0.0.1", 3181),
+ regionsRoutingService.getHost("any", routingContext));
+
+ routingContext =
+ RoutingService.RoutingContext.of(new DefaultRegionResolver())
+ .addTriedHost(new InetSocketAddress("127.0.0.1", 3181), StatusCode.WRITE_EXCEPTION);
+ assertEquals(new InetSocketAddress("127.0.0.1", 3182),
+ regionsRoutingService.getHost("any", routingContext));
+
+ // add 3182 to routing context as tried host
+ routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3182), StatusCode.WRITE_EXCEPTION);
+ assertEquals(new InetSocketAddress("127.0.0.1", 3183),
+ regionsRoutingService.getHost("any", routingContext));
+
+ // add 3183 to routing context as tried host
+ routingContext.addTriedHost(new InetSocketAddress("127.0.0.1", 3183), StatusCode.WRITE_EXCEPTION);
+ try {
+ regionsRoutingService.getHost("any", routingContext);
+ fail("Should fail to get host since all regions are tried.");
+ } catch (NoBrokersAvailableException nbae) {
+ // expected
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
new file mode 100644
index 0000000..d2d61a9
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/routing/TestRoutingService.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.routing;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
+import com.twitter.finagle.Address;
+import com.twitter.finagle.Addresses;
+import com.twitter.finagle.addr.WeightedAddress;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Test Case for {@link RoutingService}.
+ */
+@RunWith(Parameterized.class)
+public class TestRoutingService {
+
+ static final Logger LOG = LoggerFactory.getLogger(TestRoutingService.class);
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> configs() {
+ ArrayList<Object[]> list = new ArrayList<Object[]>();
+ for (int i = 0; i <= 1; i++) {
+ for (int j = 0; j <= 1; j++) {
+ for (int k = 0; k <= 1; k++) {
+ list.add(new Boolean[] {i == 1, j == 1, k == 1});
+ }
+ }
+ }
+ return list;
+ }
+
+ private final boolean consistentHash;
+ private final boolean weightedAddresses;
+ private final boolean asyncResolution;
+
+ public TestRoutingService(boolean consistentHash, boolean weightedAddresses, boolean asyncResolution) {
+ this.consistentHash = consistentHash;
+ this.weightedAddresses = weightedAddresses;
+ this.asyncResolution = asyncResolution;
+ }
+
+ private List<Address> getAddresses(boolean weightedAddresses) {
+ ArrayList<Address> addresses = new ArrayList<Address>();
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.2", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.3", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.4", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.5", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.6", 3181)));
+ addresses.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.7", 3181)));
+
+ if (weightedAddresses) {
+ ArrayList<Address> wAddresses = new ArrayList<Address>();
+ for (Address address: addresses) {
+ wAddresses.add(WeightedAddress.apply(address, 1.0));
+ }
+ return wAddresses;
+ } else {
+ return addresses;
+ }
+ }
+
+ private void testRoutingServiceHelper(boolean consistentHash,
+ boolean weightedAddresses,
+ boolean asyncResolution)
+ throws Exception {
+ ExecutorService executorService = null;
+ final List<Address> addresses = getAddresses(weightedAddresses);
+ final TestName name = new TestName();
+ RoutingService routingService;
+ if (consistentHash) {
+ routingService = ConsistentHashRoutingService.newBuilder()
+ .serverSet(new NameServerSet(name))
+ .resolveFromName(true)
+ .numReplicas(997)
+ .build();
+ } else {
+ routingService = ServerSetRoutingService.newServerSetRoutingServiceBuilder()
+ .serverSetWatcher(new TwitterServerSetWatcher(new NameServerSet(name), true)).build();
+ }
+
+ if (asyncResolution) {
+ executorService = Executors.newSingleThreadExecutor();
+ executorService.submit(new Runnable() {
+ @Override
+ public void run() {
+ name.changeAddrs(addresses);
+ }
+ });
+ } else {
+ name.changeAddrs(addresses);
+ }
+ routingService.startService();
+
+ HashSet<SocketAddress> mapping = new HashSet<SocketAddress>();
+
+ for (int i = 0; i < 1000; i++) {
+ for (int j = 0; j < 5; j++) {
+ String stream = "TestStream-" + i + "-" + j;
+ mapping.add(routingService.getHost(stream,
+ RoutingService.RoutingContext.of(new DefaultRegionResolver())));
+ }
+ }
+
+ assertEquals(mapping.size(), addresses.size());
+
+ if (null != executorService) {
+ executorService.shutdown();
+ }
+
+ }
+
+ @Test(timeout = 5000)
+ public void testRoutingService() throws Exception {
+ testRoutingServiceHelper(this.consistentHash, this.weightedAddresses, this.asyncResolution);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
new file mode 100644
index 0000000..ab0cb58
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/client/speculative/TestDefaultSpeculativeRequestExecutionPolicy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.client.speculative;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+import com.twitter.util.CountDownLatch;
+import com.twitter.util.Future;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+/**
+ * Test {@link TestDefaultSpeculativeRequestExecutionPolicy}.
+ */
+public class TestDefaultSpeculativeRequestExecutionPolicy {
+
+ @Test(timeout = 20000, expected = IllegalArgumentException.class)
+ public void testInvalidBackoffMultiplier() throws Exception {
+ new DefaultSpeculativeRequestExecutionPolicy(100, 200, -1);
+ }
+
+ @Test(timeout = 20000, expected = IllegalArgumentException.class)
+ public void testInvalidMaxSpeculativeTimeout() throws Exception {
+ new DefaultSpeculativeRequestExecutionPolicy(100, Integer.MAX_VALUE, 2);
+ }
+
+ @Test(timeout = 20000)
+ public void testSpeculativeRequests() throws Exception {
+ DefaultSpeculativeRequestExecutionPolicy policy =
+ new DefaultSpeculativeRequestExecutionPolicy(10, 10000, 2);
+ SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+ final AtomicInteger callCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ return Future.value(callCount.incrementAndGet() < 3);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }).when(executor).issueSpeculativeRequest();
+
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor();
+ policy.initiateSpeculativeRequest(executorService, executor);
+
+ latch.await();
+
+ assertEquals(40, policy.getNextSpeculativeRequestTimeout());
+ }
+
+ @Test(timeout = 20000)
+ public void testSpeculativeRequestsWithMaxTimeout() throws Exception {
+ DefaultSpeculativeRequestExecutionPolicy policy =
+ new DefaultSpeculativeRequestExecutionPolicy(10, 15, 2);
+ SpeculativeRequestExecutor executor = mock(SpeculativeRequestExecutor.class);
+
+ final AtomicInteger callCount = new AtomicInteger(0);
+ final CountDownLatch latch = new CountDownLatch(3);
+
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ try {
+ return Future.value(callCount.incrementAndGet() < 3);
+ } finally {
+ latch.countDown();
+ }
+ }
+ }).when(executor).issueSpeculativeRequest();
+
+ ScheduledExecutorService executorService =
+ Executors.newSingleThreadScheduledExecutor();
+ policy.initiateSpeculativeRequest(executorService, executor);
+
+ latch.await();
+
+ assertEquals(15, policy.getNextSpeculativeRequestTimeout());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
new file mode 100644
index 0000000..d2df9a5
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/java/org/apache/distributedlog/service/TestDistributedLogClientBuilder.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.distributedlog.service;
+
+import static org.junit.Assert.assertFalse;
+
+import com.twitter.finagle.builder.ClientBuilder;
+import com.twitter.finagle.thrift.ClientId$;
+import com.twitter.util.Duration;
+import org.junit.Test;
+
+/**
+ * Test Case of {@link org.apache.distributedlog.service.DistributedLogClientBuilder}.
+ */
+public class TestDistributedLogClientBuilder {
+
+ @Test(timeout = 60000)
+ public void testBuildClientsFromSameBuilder() throws Exception {
+ DistributedLogClientBuilder builder = DistributedLogClientBuilder.newBuilder()
+ .name("build-clients-from-same-builder")
+ .clientId(ClientId$.MODULE$.apply("test-builder"))
+ .finagleNameStr("inet!127.0.0.1:7001")
+ .streamNameRegex(".*")
+ .handshakeWithClientInfo(true)
+ .clientBuilder(ClientBuilder.get()
+ .hostConnectionLimit(1)
+ .connectTimeout(Duration.fromSeconds(1))
+ .tcpConnectTimeout(Duration.fromSeconds(1))
+ .requestTimeout(Duration.fromSeconds(10)));
+ DistributedLogClient client1 = builder.build();
+ DistributedLogClient client2 = builder.build();
+ assertFalse(client1 == client2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-proxy-client/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/distributedlog-proxy-client/src/test/resources/log4j.properties b/distributedlog-proxy-client/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3e51059
--- /dev/null
+++ b/distributedlog-proxy-client/src/test/resources/log4j.properties
@@ -0,0 +1,51 @@
+#/**
+# * Licensed to the Apache Software Foundation (ASF) under one
+# * or more contributor license agreements. See the NOTICE file
+# * distributed with this work for additional information
+# * regarding copyright ownership. The ASF licenses this file
+# * to you under the Apache License, Version 2.0 (the
+# * "License"); you may not use this file except in compliance
+# * with the License. You may obtain a copy of the License at
+# *
+# * http://www.apache.org/licenses/LICENSE-2.0
+# *
+# * Unless required by applicable law or agreed to in writing, software
+# * distributed under the License is distributed on an "AS IS" BASIS,
+# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * See the License for the specific language governing permissions and
+# * limitations under the License.
+# */
+
+#
+# DisributedLog Logging Configuration
+#
+
+# Example with rolling log file
+log4j.rootLogger=INFO, CONSOLE
+
+#disable zookeeper logging
+log4j.logger.org.apache.zookeeper=OFF
+#Set the bookkeeper level to warning
+log4j.logger.org.apache.bookkeeper=INFO
+
+log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
+log4j.appender.CONSOLE.Threshold=INFO
+log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
+log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+# Add ROLLINGFILE to rootLogger to get log file output
+# Log DEBUG level and above messages to a log file
+#log4j.appender.ROLLINGFILE=org.apache.log4j.DailyRollingFileAppender
+#log4j.appender.ROLLINGFILE.Threshold=INFO
+#log4j.appender.ROLLINGFILE.File=distributedlog.log
+#log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
+#log4j.appender.ROLLINGFILE.DatePattern='.'yyyy-MM-dd-HH-mm
+#log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.Threshold=TRACE
+log4j.appender.R.File=target/error.log
+log4j.appender.R.MaxFileSize=200MB
+log4j.appender.R.MaxBackupIndex=7
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n