You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/06/12 15:45:33 UTC
[26/30] incubator-distributedlog git commit: DL-205: Remove
StatusCode dependency on DLException
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
deleted file mode 100644
index 0e2a152..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/DistributedLogClientBuilder.java
+++ /dev/null
@@ -1,608 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.service;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Optional;
-import com.twitter.common.zookeeper.ServerSet;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.DistributedLogClientImpl;
-import org.apache.distributedlog.client.monitor.MonitorServiceClient;
-import org.apache.distributedlog.client.proxy.ClusterClient;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.resolver.RegionResolver;
-import org.apache.distributedlog.client.routing.RegionsRoutingService;
-import org.apache.distributedlog.client.routing.RoutingService;
-import org.apache.distributedlog.client.routing.RoutingUtils;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import java.net.SocketAddress;
-import java.net.URI;
-import java.util.Random;
-import org.apache.commons.lang.StringUtils;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Option;
-
-/**
- * Builder to build {@link DistributedLogClient}.
- */
-public final class DistributedLogClientBuilder {
-
- private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientBuilder.class);
-
- private static final Random random = new Random(System.currentTimeMillis());
-
- private String name = null;
- private ClientId clientId = null;
- private RoutingService.Builder routingServiceBuilder = null;
- private ClientBuilder clientBuilder = null;
- private String serverRoutingServiceFinagleName = null;
- private StatsReceiver statsReceiver = new NullStatsReceiver();
- private StatsReceiver streamStatsReceiver = new NullStatsReceiver();
- private ClientConfig clientConfig = new ClientConfig();
- private boolean enableRegionStats = false;
- private final RegionResolver regionResolver = new DefaultRegionResolver();
-
- /**
- * Create a client builder.
- *
- * @return client builder
- */
- public static DistributedLogClientBuilder newBuilder() {
- return new DistributedLogClientBuilder();
- }
-
- /**
- * Create a new client builder from an existing {@code builder}.
- *
- * @param builder the existing builder.
- * @return a new client builder.
- */
- public static DistributedLogClientBuilder newBuilder(DistributedLogClientBuilder builder) {
- DistributedLogClientBuilder newBuilder = new DistributedLogClientBuilder();
- newBuilder.name = builder.name;
- newBuilder.clientId = builder.clientId;
- newBuilder.clientBuilder = builder.clientBuilder;
- newBuilder.routingServiceBuilder = builder.routingServiceBuilder;
- newBuilder.statsReceiver = builder.statsReceiver;
- newBuilder.streamStatsReceiver = builder.streamStatsReceiver;
- newBuilder.enableRegionStats = builder.enableRegionStats;
- newBuilder.serverRoutingServiceFinagleName = builder.serverRoutingServiceFinagleName;
- newBuilder.clientConfig = ClientConfig.newConfig(builder.clientConfig);
- return newBuilder;
- }
-
- // private constructor
- private DistributedLogClientBuilder() {}
-
- /**
- * Client Name.
- *
- * @param name
- * client name
- * @return client builder.
- */
- public DistributedLogClientBuilder name(String name) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.name = name;
- return newBuilder;
- }
-
- /**
- * Client ID.
- *
- * @param clientId
- * client id
- * @return client builder.
- */
- public DistributedLogClientBuilder clientId(ClientId clientId) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientId = clientId;
- return newBuilder;
- }
-
- /**
- * Serverset to access proxy services.
- *
- * @param serverSet
- * server set.
- * @return client builder.
- */
- public DistributedLogClientBuilder serverSet(ServerSet serverSet) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(serverSet);
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- /**
- * Server Sets to access proxy services.
- *
- * <p>The <i>local</i> server set will be tried first then <i>remotes</i>.
- *
- * @param local local server set.
- * @param remotes remote server sets.
- * @return client builder.
- */
- public DistributedLogClientBuilder serverSets(ServerSet local, ServerSet...remotes) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
- builders[0] = RoutingUtils.buildRoutingService(local);
- for (int i = 1; i < builders.length; i++) {
- builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
- }
- newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
- .resolver(regionResolver)
- .routingServiceBuilders(builders);
- newBuilder.enableRegionStats = remotes.length > 0;
- return newBuilder;
- }
-
- /**
- * Name to access proxy services.
- *
- * @param finagleNameStr
- * finagle name string.
- * @return client builder.
- */
- public DistributedLogClientBuilder finagleNameStr(String finagleNameStr) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- /**
- * Finagle name strs to access proxy services.
- *
- * <p>The <i>local</i> finalge name str will be tried first, then <i>remotes</i>.
- *
- * @param local local server set.
- * @param remotes remote server sets.
- * @return client builder.
- */
- public DistributedLogClientBuilder finagleNameStrs(String local, String...remotes) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- RoutingService.Builder[] builders = new RoutingService.Builder[remotes.length + 1];
- builders[0] = RoutingUtils.buildRoutingService(local);
- for (int i = 1; i < builders.length; i++) {
- builders[i] = RoutingUtils.buildRoutingService(remotes[i - 1]);
- }
- newBuilder.routingServiceBuilder = RegionsRoutingService.newBuilder()
- .routingServiceBuilders(builders)
- .resolver(regionResolver);
- newBuilder.enableRegionStats = remotes.length > 0;
- return newBuilder;
- }
-
- /**
- * URI to access proxy services.
- *
- * <p>Assuming the write proxies are announced under `.write_proxy` of the provided namespace uri.
- * The builder will convert the dl uri (e.g. distributedlog://{zkserver}/path/to/namespace) to
- * zookeeper serverset based finagle name str (`zk!{zkserver}!/path/to/namespace/.write_proxy`)
- *
- * @param uri namespace uri to access the serverset of write proxies
- * @return distributedlog builder
- */
- public DistributedLogClientBuilder uri(URI uri) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- String zkServers = uri.getAuthority().replace(";", ",");
- String[] zkServerList = StringUtils.split(zkServers, ',');
- String finagleNameStr = String.format(
- "zk!%s!%s/.write_proxy",
- zkServerList[random.nextInt(zkServerList.length)], // zk server
- uri.getPath());
- newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(finagleNameStr);
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- /**
- * Address of write proxy to connect.
- *
- * @param address
- * write proxy address.
- * @return client builder.
- */
- public DistributedLogClientBuilder host(SocketAddress address) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(address);
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- private DistributedLogClientBuilder routingServiceBuilder(RoutingService.Builder builder) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.routingServiceBuilder = builder;
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- /**
- * Routing Service to access proxy services.
- *
- * @param routingService
- * routing service
- * @return client builder.
- */
- @VisibleForTesting
- public DistributedLogClientBuilder routingService(RoutingService routingService) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.routingServiceBuilder = RoutingUtils.buildRoutingService(routingService);
- newBuilder.enableRegionStats = false;
- return newBuilder;
- }
-
- /**
- * Stats receiver to expose client stats.
- *
- * @param statsReceiver
- * stats receiver.
- * @return client builder.
- */
- public DistributedLogClientBuilder statsReceiver(StatsReceiver statsReceiver) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.statsReceiver = statsReceiver;
- return newBuilder;
- }
-
- /**
- * Stream Stats Receiver to expose per stream stats.
- *
- * @param streamStatsReceiver
- * stream stats receiver
- * @return client builder.
- */
- public DistributedLogClientBuilder streamStatsReceiver(StatsReceiver streamStatsReceiver) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.streamStatsReceiver = streamStatsReceiver;
- return newBuilder;
- }
-
- /**
- * Set underlying finagle client builder.
- *
- * @param builder
- * finagle client builder.
- * @return client builder.
- */
- public DistributedLogClientBuilder clientBuilder(ClientBuilder builder) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientBuilder = builder;
- return newBuilder;
- }
-
- /**
- * Backoff time when redirecting to an already retried host.
- *
- * @param ms
- * backoff time.
- * @return client builder.
- */
- public DistributedLogClientBuilder redirectBackoffStartMs(int ms) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setRedirectBackoffStartMs(ms);
- return newBuilder;
- }
-
- /**
- * Max backoff time when redirecting to an already retried host.
- *
- * @param ms
- * backoff time.
- * @return client builder.
- */
- public DistributedLogClientBuilder redirectBackoffMaxMs(int ms) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setRedirectBackoffMaxMs(ms);
- return newBuilder;
- }
-
- /**
- * Max redirects that is allowed per request.
- *
- * <p>If <i>redirects</i> are exhausted, fail the request immediately.
- *
- * @param redirects
- * max redirects allowed before failing a request.
- * @return client builder.
- */
- public DistributedLogClientBuilder maxRedirects(int redirects) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setMaxRedirects(redirects);
- return newBuilder;
- }
-
- /**
- * Timeout per request in millis.
- *
- * @param timeoutMs
- * timeout per request in millis.
- * @return client builder.
- */
- public DistributedLogClientBuilder requestTimeoutMs(int timeoutMs) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setRequestTimeoutMs(timeoutMs);
- return newBuilder;
- }
-
- /**
- * Set thriftmux enabled.
- *
- * @param enabled
- * is thriftmux enabled
- * @return client builder.
- */
- public DistributedLogClientBuilder thriftmux(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setThriftMux(enabled);
- return newBuilder;
- }
-
- /**
- * Set failfast stream exception handling enabled.
- *
- * @param enabled
- * is failfast exception handling enabled
- * @return client builder.
- */
- public DistributedLogClientBuilder streamFailfast(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setStreamFailfast(enabled);
- return newBuilder;
- }
-
- /**
- * Set the regex to match stream names that the client cares about.
- *
- * @param nameRegex
- * stream name regex
- * @return client builder
- */
- public DistributedLogClientBuilder streamNameRegex(String nameRegex) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setStreamNameRegex(nameRegex);
- return newBuilder;
- }
-
- /**
- * Whether to use the new handshake endpoint to exchange ownership cache.
- *
- * <p>Enable this when the servers are updated to support handshaking with client info.
- *
- * @param enabled
- * new handshake endpoint is enabled.
- * @return client builder.
- */
- public DistributedLogClientBuilder handshakeWithClientInfo(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setHandshakeWithClientInfo(enabled);
- return newBuilder;
- }
-
- /**
- * Set the periodic handshake interval in milliseconds.
- *
- * <p>Every <code>intervalMs</code>, the DL client will handshake with existing proxies again.
- * If the interval is less than ownership sync interval, the handshake won't sync ownerships. Otherwise, it will.
- *
- * @see #periodicOwnershipSyncIntervalMs(long)
- * @param intervalMs
- * handshake interval
- * @return client builder.
- */
- public DistributedLogClientBuilder periodicHandshakeIntervalMs(long intervalMs) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setPeriodicHandshakeIntervalMs(intervalMs);
- return newBuilder;
- }
-
- /**
- * Set the periodic ownership sync interval in milliseconds.
- *
- * <p>If periodic handshake is enabled, the handshake will sync ownership if the elapsed time is larger than
- * sync interval.
- *
- * @see #periodicHandshakeIntervalMs(long)
- * @param intervalMs
- * interval that handshake should sync ownerships.
- * @return client builder
- */
- public DistributedLogClientBuilder periodicOwnershipSyncIntervalMs(long intervalMs) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setPeriodicOwnershipSyncIntervalMs(intervalMs);
- return newBuilder;
- }
-
- /**
- * Enable/Disable periodic dumping ownership cache.
- *
- * @param enabled
- * flag to enable/disable periodic dumping ownership cache
- * @return client builder.
- */
- public DistributedLogClientBuilder periodicDumpOwnershipCache(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setPeriodicDumpOwnershipCacheEnabled(enabled);
- return newBuilder;
- }
-
- /**
- * Set periodic dumping ownership cache interval.
- *
- * @param intervalMs
- * interval on dumping ownership cache, in millis.
- * @return client builder
- */
- public DistributedLogClientBuilder periodicDumpOwnershipCacheIntervalMs(long intervalMs) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setPeriodicDumpOwnershipCacheIntervalMs(intervalMs);
- return newBuilder;
- }
-
- /**
- * Enable handshake tracing.
- *
- * @param enabled
- * flag to enable/disable handshake tracing
- * @return client builder
- */
- public DistributedLogClientBuilder handshakeTracing(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setHandshakeTracingEnabled(enabled);
- return newBuilder;
- }
-
- /**
- * Enable checksum on requests to the proxy.
- *
- * @param enabled
- * flag to enable/disable checksum
- * @return client builder
- */
- public DistributedLogClientBuilder checksum(boolean enabled) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig.setChecksumEnabled(enabled);
- return newBuilder;
- }
-
- /**
- * Configure the finagle name string for the server-side routing service.
- *
- * @param nameStr name string of the server-side routing service
- * @return client builder
- */
- public DistributedLogClientBuilder serverRoutingServiceFinagleNameStr(String nameStr) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.serverRoutingServiceFinagleName = nameStr;
- return newBuilder;
- }
-
- DistributedLogClientBuilder clientConfig(ClientConfig clientConfig) {
- DistributedLogClientBuilder newBuilder = newBuilder(this);
- newBuilder.clientConfig = ClientConfig.newConfig(clientConfig);
- return newBuilder;
- }
-
- /**
- * Build distributedlog client.
- *
- * @return distributedlog client.
- */
- public DistributedLogClient build() {
- return buildClient();
- }
-
- /**
- * Build monitor service client.
- *
- * @return monitor service client.
- */
- public MonitorServiceClient buildMonitorClient() {
-
- return buildClient();
- }
-
- @SuppressWarnings("unchecked")
- ClusterClient buildServerRoutingServiceClient(String serverRoutingServiceFinagleName) {
- ClientBuilder builder = this.clientBuilder;
- if (null == builder) {
- builder = ClientBuilder.get()
- .tcpConnectTimeout(Duration.fromMilliseconds(200))
- .connectTimeout(Duration.fromMilliseconds(200))
- .requestTimeout(Duration.fromSeconds(1))
- .retries(20);
- if (!clientConfig.getThriftMux()) {
- builder = builder.hostConnectionLimit(1);
- }
- }
- if (clientConfig.getThriftMux()) {
- builder = builder.stack(ThriftMux.client().withClientId(clientId));
- } else {
- builder = builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
- }
-
- Name name;
- try {
- name = Resolver$.MODULE$.eval(serverRoutingServiceFinagleName);
- } catch (Exception exc) {
- logger.error("Exception in Resolver.eval for name {}", serverRoutingServiceFinagleName, exc);
- throw new RuntimeException(exc);
- }
-
- // builder the client
- Service<ThriftClientRequest, byte[]> client =
- ClientBuilder.safeBuildFactory(
- builder.dest(name).reportTo(statsReceiver.scope("routing"))
- ).toService();
- DistributedLogService.ServiceIface service =
- new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
- return new ClusterClient(client, service);
- }
-
- DistributedLogClientImpl buildClient() {
- checkNotNull(name, "No name provided.");
- checkNotNull(clientId, "No client id provided.");
- checkNotNull(routingServiceBuilder, "No routing service builder provided.");
- checkNotNull(statsReceiver, "No stats receiver provided.");
- if (null == streamStatsReceiver) {
- streamStatsReceiver = new NullStatsReceiver();
- }
-
- Optional<ClusterClient> serverRoutingServiceClient = Optional.absent();
- if (null != serverRoutingServiceFinagleName) {
- serverRoutingServiceClient = Optional.of(
- buildServerRoutingServiceClient(serverRoutingServiceFinagleName));
- }
-
- RoutingService routingService = routingServiceBuilder
- .statsReceiver(statsReceiver.scope("routing"))
- .build();
- DistributedLogClientImpl clientImpl =
- new DistributedLogClientImpl(
- name,
- clientId,
- routingService,
- clientBuilder,
- clientConfig,
- serverRoutingServiceClient,
- statsReceiver,
- streamStatsReceiver,
- regionResolver,
- enableRegionStats);
- routingService.startService();
- clientImpl.handshake();
- return clientImpl;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java b/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
deleted file mode 100644
index 033882f..0000000
--- a/distributedlog-client/src/main/java/org/apache/distributedlog/service/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * DistributedLog Service Client.
- */
-package org.apache.distributedlog.service;
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/resources/findbugsExclude.xml b/distributedlog-client/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index 05ee085..0000000
--- a/distributedlog-client/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,23 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-//-->
-<FindBugsFilter>
- <Match>
- <!-- generated code, we can't be held responsible for findbugs in it //-->
- <Class name="~org\.apache\.distributedlog\.thrift.*" />
- </Match>
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
deleted file mode 100644
index d7494de..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/TestDistributedLogMultiStreamWriter.java
+++ /dev/null
@@ -1,383 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client;
-
-import static com.google.common.base.Charsets.UTF_8;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import com.google.common.collect.Lists;
-import org.apache.distributedlog.DLSN;
-import org.apache.distributedlog.LogRecord;
-import org.apache.distributedlog.LogRecordSet;
-import org.apache.distributedlog.LogRecordSetBuffer;
-import org.apache.distributedlog.exceptions.LogRecordTooLongException;
-import org.apache.distributedlog.io.CompressionCodec;
-import org.apache.distributedlog.service.DistributedLogClient;
-import com.twitter.finagle.IndividualRequestTimeoutException;
-import com.twitter.util.Await;
-import com.twitter.util.Future;
-import com.twitter.util.Promise;
-import java.nio.ByteBuffer;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test {@link DistributedLogMultiStreamWriter}.
- */
-public class TestDistributedLogMultiStreamWriter {
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithNullStreams() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithEmptyStreamList() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.<String>newArrayList())
- .build();
- }
-
- @Test(timeout = 20000, expected = NullPointerException.class)
- public void testBuildWithNullClient() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .build();
- }
-
- @Test(timeout = 20000, expected = NullPointerException.class)
- public void testBuildWithNullCodec() throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(null)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings1()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(-1)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings2()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(5)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings3()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(-1)
- .build();
- }
-
- @Test(timeout = 20000, expected = IllegalArgumentException.class)
- public void testBuildWithInvalidSpeculativeSettings4()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(10)
- .build();
- }
-
- @Test(timeout = 20000)
- public void testBuildMultiStreamWriter()
- throws Exception {
- DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .build();
- assertTrue(true);
- }
-
- @Test(timeout = 20000)
- public void testBuildWithPeriodicalFlushEnabled() throws Exception {
- ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .flushIntervalMs(1000)
- .scheduler(executorService)
- .build();
- verify(executorService, times(1)).scheduleAtFixedRate(writer, 1000000, 1000000, TimeUnit.MICROSECONDS);
- }
-
- @Test(timeout = 20000)
- public void testBuildWithPeriodicalFlushDisabled() throws Exception {
- ScheduledExecutorService executorService = mock(ScheduledExecutorService.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(mock(DistributedLogClient.class))
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(50)
- .flushIntervalMs(0)
- .scheduler(executorService)
- .build();
- verify(executorService, times(0)).scheduleAtFixedRate(writer, 1000, 1000, TimeUnit.MILLISECONDS);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFlushWhenBufferIsFull() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
-
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor();
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(500000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .scheduler(executorService)
- .build();
-
- ByteBuffer buffer = ByteBuffer.wrap("test".getBytes(UTF_8));
- writer.write(buffer);
-
- verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFlushWhenExceedMaxLogRecordSetSize()
- throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(Future.value(new DLSN(1L, 1L, 999L)));
- ScheduledExecutorService executorService =
- Executors.newSingleThreadScheduledExecutor();
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(500000)
- .flushIntervalMs(0)
- .bufferSize(Integer.MAX_VALUE)
- .scheduler(executorService)
- .build();
-
- byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE - 3 * 100];
- ByteBuffer buffer1 = ByteBuffer.wrap(data);
- writer.write(buffer1);
- verify(client, times(0)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
- LogRecordSet.Writer recordSetWriter1 = writer.getLogRecordSetWriter();
- assertEquals(1, recordSetWriter1.getNumRecords());
- assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter1.getNumBytes());
-
- ByteBuffer buffer2 = ByteBuffer.wrap(data);
- writer.write(buffer2);
- verify(client, times(1)).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
- LogRecordSet.Writer recordSetWriter2 = writer.getLogRecordSetWriter();
- assertEquals(1, recordSetWriter2.getNumRecords());
- assertEquals(LogRecordSet.HEADER_LEN + 4 + data.length, recordSetWriter2.getNumBytes());
- assertTrue(recordSetWriter1 != recordSetWriter2);
-
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testWriteTooLargeRecord() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(100000)
- .maxSpeculativeTimeoutMs(200000)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .build();
-
- byte[] data = new byte[LogRecord.MAX_LOGRECORD_SIZE + 10];
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- assertTrue(writeFuture.isDefined());
- try {
- Await.result(writeFuture);
- fail("Should fail on writing too long record");
- } catch (LogRecordTooLongException lrtle) {
- // expected
- }
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testSpeculativeWrite() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(0)
- .bufferSize(0)
- .build();
-
- final String secondStream = writer.getStream(1);
-
- final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- Object[] arguments = invocation.getArguments();
- String stream = (String) arguments[0];
- if (stream.equals(secondStream)) {
- return Future.value(dlsn);
- } else {
- return new Promise<DLSN>();
- }
- }
- }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- DLSN writeDLSN = Await.result(writeFuture);
- assertEquals(dlsn, writeDLSN);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testPeriodicalFlush() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(10)
- .bufferSize(Integer.MAX_VALUE)
- .build();
-
- final DLSN dlsn = new DLSN(99L, 88L, 0L);
-
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- return Future.value(dlsn);
- }
- }).when(client).writeRecordSet((String) any(), (LogRecordSetBuffer) any());
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- DLSN writeDLSN = Await.result(writeFuture);
- assertEquals(dlsn, writeDLSN);
- writer.close();
- }
-
- @Test(timeout = 20000)
- public void testFailRequestAfterRetriedAllStreams() throws Exception {
- DistributedLogClient client = mock(DistributedLogClient.class);
- when(client.writeRecordSet((String) any(), (LogRecordSetBuffer) any()))
- .thenReturn(new Promise<DLSN>());
- DistributedLogMultiStreamWriter writer = DistributedLogMultiStreamWriter.newBuilder()
- .streams(Lists.newArrayList("stream1", "stream2"))
- .client(client)
- .compressionCodec(CompressionCodec.Type.LZ4)
- .firstSpeculativeTimeoutMs(10)
- .maxSpeculativeTimeoutMs(20)
- .speculativeBackoffMultiplier(2)
- .requestTimeoutMs(5000000)
- .flushIntervalMs(10)
- .bufferSize(Integer.MAX_VALUE)
- .build();
-
- byte[] data = "test-test".getBytes(UTF_8);
- ByteBuffer buffer = ByteBuffer.wrap(data);
- Future<DLSN> writeFuture = writer.write(buffer);
- try {
- Await.result(writeFuture);
- fail("Should fail the request after retries all streams");
- } catch (IndividualRequestTimeoutException e) {
- long timeoutMs = e.timeout().inMilliseconds();
- assertTrue(timeoutMs >= (10 + 20) && timeoutMs < 5000000);
- }
- writer.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
deleted file mode 100644
index 86d1c11..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/ownership/TestOwnershipCache.java
+++ /dev/null
@@ -1,207 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.ownership;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.distributedlog.client.ClientConfig;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Case for Ownership Cache.
- */
-public class TestOwnershipCache {
-
- @Rule
- public TestName runtime = new TestName();
-
- private static OwnershipCache createOwnershipCache() {
- ClientConfig clientConfig = new ClientConfig();
- return new OwnershipCache(clientConfig, null,
- NullStatsReceiver.get(), NullStatsReceiver.get());
- }
-
- private static SocketAddress createSocketAddress(int port) {
- return new InetSocketAddress("127.0.0.1", port);
- }
-
- @Test(timeout = 60000)
- public void testUpdateOwner() {
- OwnershipCache cache = createOwnershipCache();
- SocketAddress addr = createSocketAddress(1000);
- String stream = runtime.getMethodName();
-
- assertTrue("Should successfully update owner if no owner exists before",
- cache.updateOwner(stream, addr));
- assertEquals("Owner should be " + addr + " for stream " + stream,
- addr, cache.getOwner(stream));
- assertTrue("Should successfully update owner if old owner is same",
- cache.updateOwner(stream, addr));
- assertEquals("Owner should be " + addr + " for stream " + stream,
- addr, cache.getOwner(stream));
- }
-
- @Test(timeout = 60000)
- public void testRemoveOwnerFromStream() {
- OwnershipCache cache = createOwnershipCache();
- int initialPort = 2000;
- int numProxies = 2;
- int numStreamsPerProxy = 2;
- for (int i = 0; i < numProxies; i++) {
- SocketAddress addr = createSocketAddress(initialPort + i);
- for (int j = 0; j < numStreamsPerProxy; j++) {
- String stream = runtime.getMethodName() + "_" + i + "_" + j;
- cache.updateOwner(stream, addr);
- }
- }
- Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
- Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should be " + numProxies + " proxies cached",
- numProxies, ownershipDistribution.size());
-
- String stream = runtime.getMethodName() + "_0_0";
- SocketAddress owner = createSocketAddress(initialPort);
-
- // remove non-existent mapping won't change anything
- SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
- cache.removeOwnerFromStream(stream, nonExistentAddr, "remove-non-existent-addr");
- assertEquals("Owner " + owner + " should not be removed",
- owner, cache.getOwner(stream));
- ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
-
- // remove existent mapping should remove ownership mapping
- cache.removeOwnerFromStream(stream, owner, "remove-owner");
- assertNull("Owner " + owner + " should be removed", cache.getOwner(stream));
- ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy - 1) + " entries left in cache",
- numProxies * numStreamsPerProxy - 1, ownershipMap.size());
- ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should still be " + numProxies + " proxies cached",
- numProxies, ownershipDistribution.size());
- Set<String> ownedStreams = ownershipDistribution.get(owner);
- assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned for " + owner,
- numStreamsPerProxy - 1, ownedStreams.size());
- assertFalse("Stream " + stream + " should not be owned by " + owner,
- ownedStreams.contains(stream));
- }
-
- @Test(timeout = 60000)
- public void testRemoveAllStreamsFromOwner() {
- OwnershipCache cache = createOwnershipCache();
- int initialPort = 2000;
- int numProxies = 2;
- int numStreamsPerProxy = 2;
- for (int i = 0; i < numProxies; i++) {
- SocketAddress addr = createSocketAddress(initialPort + i);
- for (int j = 0; j < numStreamsPerProxy; j++) {
- String stream = runtime.getMethodName() + "_" + i + "_" + j;
- cache.updateOwner(stream, addr);
- }
- }
- Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
- Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should be " + numProxies + " proxies cached",
- numProxies, ownershipDistribution.size());
-
- SocketAddress owner = createSocketAddress(initialPort);
-
- // remove non-existent host won't change anything
- SocketAddress nonExistentAddr = createSocketAddress(initialPort + 999);
- cache.removeAllStreamsFromOwner(nonExistentAddr);
- ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should still be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
- ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should still be " + numProxies + " proxies cached",
- numProxies, ownershipDistribution.size());
-
- // remove existent host should remove ownership mapping
- cache.removeAllStreamsFromOwner(owner);
- ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + ((numProxies - 1) * numStreamsPerProxy) + " entries left in cache",
- (numProxies - 1) * numStreamsPerProxy, ownershipMap.size());
- ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should be " + (numProxies - 1) + " proxies cached",
- numProxies - 1, ownershipDistribution.size());
- assertFalse("Host " + owner + " should not be cached",
- ownershipDistribution.containsKey(owner));
- }
-
- @Test(timeout = 60000)
- public void testReplaceOwner() {
- OwnershipCache cache = createOwnershipCache();
- int initialPort = 2000;
- int numProxies = 2;
- int numStreamsPerProxy = 2;
- for (int i = 0; i < numProxies; i++) {
- SocketAddress addr = createSocketAddress(initialPort + i);
- for (int j = 0; j < numStreamsPerProxy; j++) {
- String stream = runtime.getMethodName() + "_" + i + "_" + j;
- cache.updateOwner(stream, addr);
- }
- }
- Map<String, SocketAddress> ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
- Map<SocketAddress, Set<String>> ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should be " + numProxies + " proxies cached",
- numProxies, ownershipDistribution.size());
-
- String stream = runtime.getMethodName() + "_0_0";
- SocketAddress oldOwner = createSocketAddress(initialPort);
- SocketAddress newOwner = createSocketAddress(initialPort + 999);
-
- cache.updateOwner(stream, newOwner);
- assertEquals("Owner of " + stream + " should be changed from " + oldOwner + " to " + newOwner,
- newOwner, cache.getOwner(stream));
- ownershipMap = cache.getStreamOwnerMapping();
- assertEquals("There should be " + (numProxies * numStreamsPerProxy) + " entries in cache",
- numProxies * numStreamsPerProxy, ownershipMap.size());
- assertEquals("Owner of " + stream + " should be " + newOwner,
- newOwner, ownershipMap.get(stream));
- ownershipDistribution = cache.getStreamOwnershipDistribution();
- assertEquals("There should be " + (numProxies + 1) + " proxies cached",
- numProxies + 1, ownershipDistribution.size());
- Set<String> oldOwnedStreams = ownershipDistribution.get(oldOwner);
- assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + oldOwner,
- numStreamsPerProxy - 1, oldOwnedStreams.size());
- assertFalse("Stream " + stream + " should not be owned by " + oldOwner,
- oldOwnedStreams.contains(stream));
- Set<String> newOwnedStreams = ownershipDistribution.get(newOwner);
- assertEquals("There should be only " + (numStreamsPerProxy - 1) + " streams owned by " + newOwner,
- 1, newOwnedStreams.size());
- assertTrue("Stream " + stream + " should be owned by " + newOwner,
- newOwnedStreams.contains(stream));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
deleted file mode 100644
index 8ef33bd..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockDistributedLogServices.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.thrift.service.BulkWriteResponse;
-import org.apache.distributedlog.thrift.service.ClientInfo;
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import org.apache.distributedlog.thrift.service.HeartbeatOptions;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import org.apache.distributedlog.thrift.service.WriteContext;
-import org.apache.distributedlog.thrift.service.WriteResponse;
-import com.twitter.util.Future;
-import java.nio.ByteBuffer;
-import java.util.List;
-
-/**
- * Mock DistributedLog Related Services.
- */
-public class MockDistributedLogServices {
-
- /**
- * Mock basic service.
- */
- static class MockBasicService implements DistributedLogService.ServiceIface {
-
- @Override
- public Future<ServerInfo> handshake() {
- return Future.value(new ServerInfo());
- }
-
- @Override
- public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
- return Future.value(new ServerInfo());
- }
-
- @Override
- public Future<WriteResponse> heartbeat(String stream, WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> heartbeatWithOptions(String stream,
- WriteContext ctx,
- HeartbeatOptions options) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> write(String stream,
- ByteBuffer data) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> writeWithContext(String stream,
- ByteBuffer data,
- WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<BulkWriteResponse> writeBulkWithContext(String stream,
- List<ByteBuffer> data,
- WriteContext ctx) {
- return Future.value(new BulkWriteResponse());
- }
-
- @Override
- public Future<WriteResponse> truncate(String stream,
- String dlsn,
- WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> release(String stream,
- WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> create(String stream, WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> delete(String stream,
- WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<WriteResponse> getOwner(String stream, WriteContext ctx) {
- return Future.value(new WriteResponse());
- }
-
- @Override
- public Future<Void> setAcceptNewStream(boolean enabled) {
- return Future.value(null);
- }
- }
-
- /**
- * Mock server info service.
- */
- public static class MockServerInfoService extends MockBasicService {
-
- protected ServerInfo serverInfo;
-
- public MockServerInfoService() {
- serverInfo = new ServerInfo();
- }
-
- public void updateServerInfo(ServerInfo serverInfo) {
- this.serverInfo = serverInfo;
- }
-
- @Override
- public Future<ServerInfo> handshake() {
- return Future.value(serverInfo);
- }
-
- @Override
- public Future<ServerInfo> handshakeWithClientInfo(ClientInfo clientInfo) {
- return Future.value(serverInfo);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
deleted file mode 100644
index e38c2ed..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockProxyClientBuilder.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import org.apache.distributedlog.thrift.service.DistributedLogService;
-import java.net.SocketAddress;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Mock Proxy Client Builder.
- */
-class MockProxyClientBuilder implements ProxyClient.Builder {
-
- static class MockProxyClient extends ProxyClient {
- MockProxyClient(SocketAddress address,
- DistributedLogService.ServiceIface service) {
- super(address, new MockThriftClient(), service);
- }
- }
-
- private final ConcurrentMap<SocketAddress, MockProxyClient> clients =
- new ConcurrentHashMap<SocketAddress, MockProxyClient>();
-
- public void provideProxyClient(SocketAddress address,
- MockProxyClient proxyClient) {
- clients.put(address, proxyClient);
- }
-
- @Override
- public ProxyClient build(SocketAddress address) {
- return clients.get(address);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
deleted file mode 100644
index ad1c878..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/MockThriftClient.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import com.twitter.finagle.Service;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Future;
-
-/**
- * Mock Thrift Client.
- */
-class MockThriftClient extends Service<ThriftClientRequest, byte[]> {
- @Override
- public Future<byte[]> apply(ThriftClientRequest request) {
- return Future.value(request.message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/c44e0278/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java b/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
deleted file mode 100644
index 6d9a471..0000000
--- a/distributedlog-client/src/test/java/org/apache/distributedlog/client/proxy/TestProxyClientManager.java
+++ /dev/null
@@ -1,368 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.distributedlog.client.proxy;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Maps;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.distributedlog.client.ClientConfig;
-import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockBasicService;
-import org.apache.distributedlog.client.proxy.MockDistributedLogServices.MockServerInfoService;
-import org.apache.distributedlog.client.proxy.MockProxyClientBuilder.MockProxyClient;
-import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
-import org.apache.distributedlog.client.stats.ClientStats;
-import org.apache.distributedlog.thrift.service.ServerInfo;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestName;
-
-/**
- * Test Proxy Client Manager.
- */
-public class TestProxyClientManager {
-
- @Rule
- public TestName runtime = new TestName();
-
- static class TestHostProvider implements HostProvider {
-
- Set<SocketAddress> hosts = new HashSet<SocketAddress>();
-
- synchronized void addHost(SocketAddress host) {
- hosts.add(host);
- }
-
- @Override
- public synchronized Set<SocketAddress> getHosts() {
- return ImmutableSet.copyOf(hosts);
- }
-
- }
-
- private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
- long periodicHandshakeIntervalMs) {
- HostProvider provider = new TestHostProvider();
- return createProxyClientManager(builder, provider, periodicHandshakeIntervalMs);
- }
-
- private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder,
- HostProvider hostProvider,
- long periodicHandshakeIntervalMs) {
- ClientConfig clientConfig = new ClientConfig();
- clientConfig.setPeriodicHandshakeIntervalMs(periodicHandshakeIntervalMs);
- clientConfig.setPeriodicOwnershipSyncIntervalMs(-1);
- HashedWheelTimer dlTimer = new HashedWheelTimer(
- new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(),
- clientConfig.getRedirectBackoffStartMs(),
- TimeUnit.MILLISECONDS);
- return new ProxyClientManager(clientConfig, builder, dlTimer, hostProvider,
- new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
- }
-
- private static SocketAddress createSocketAddress(int port) {
- return new InetSocketAddress("127.0.0.1", port);
- }
-
- private static MockProxyClient createMockProxyClient(SocketAddress address) {
- return new MockProxyClient(address, new MockBasicService());
- }
-
- private static Pair<MockProxyClient, MockServerInfoService> createMockProxyClient(
- SocketAddress address, ServerInfo serverInfo) {
- MockServerInfoService service = new MockServerInfoService();
- MockProxyClient proxyClient = new MockProxyClient(address, service);
- service.updateServerInfo(serverInfo);
- return Pair.of(proxyClient, service);
- }
-
- @Test(timeout = 60000)
- public void testBasicCreateRemove() throws Exception {
- SocketAddress address = createSocketAddress(1000);
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- MockProxyClient mockProxyClient = createMockProxyClient(address);
- builder.provideProxyClient(address, mockProxyClient);
-
- ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- ProxyClient proxyClient = clientManager.createClient(address);
- assertEquals("Create client should build the proxy client",
- 1, clientManager.getNumProxies());
- assertTrue("The client returned should be the same client that builder built",
- mockProxyClient == proxyClient);
- }
-
- @Test(timeout = 60000)
- public void testGetShouldCreateClient() throws Exception {
- SocketAddress address = createSocketAddress(2000);
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- MockProxyClient mockProxyClient = createMockProxyClient(address);
- builder.provideProxyClient(address, mockProxyClient);
-
- ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- ProxyClient proxyClient = clientManager.getClient(address);
- assertEquals("Get client should build the proxy client",
- 1, clientManager.getNumProxies());
- assertTrue("The client returned should be the same client that builder built",
- mockProxyClient == proxyClient);
- }
-
- @Test(timeout = 60000)
- public void testConditionalRemoveClient() throws Exception {
- SocketAddress address = createSocketAddress(3000);
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- MockProxyClient mockProxyClient = createMockProxyClient(address);
- MockProxyClient anotherMockProxyClient = createMockProxyClient(address);
- builder.provideProxyClient(address, mockProxyClient);
-
- ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- clientManager.createClient(address);
- assertEquals("Create client should build the proxy client",
- 1, clientManager.getNumProxies());
- clientManager.removeClient(address, anotherMockProxyClient);
- assertEquals("Conditional remove should not remove proxy client",
- 1, clientManager.getNumProxies());
- clientManager.removeClient(address, mockProxyClient);
- assertEquals("Conditional remove should remove proxy client",
- 0, clientManager.getNumProxies());
- }
-
- @Test(timeout = 60000)
- public void testRemoveClient() throws Exception {
- SocketAddress address = createSocketAddress(3000);
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- MockProxyClient mockProxyClient = createMockProxyClient(address);
- builder.provideProxyClient(address, mockProxyClient);
-
- ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- clientManager.createClient(address);
- assertEquals("Create client should build the proxy client",
- 1, clientManager.getNumProxies());
- clientManager.removeClient(address);
- assertEquals("Remove should remove proxy client",
- 0, clientManager.getNumProxies());
- }
-
- @Test(timeout = 60000)
- public void testCreateClientShouldHandshake() throws Exception {
- SocketAddress address = createSocketAddress(3000);
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- ServerInfo serverInfo = new ServerInfo();
- serverInfo.putToOwnerships(runtime.getMethodName() + "_stream",
- runtime.getMethodName() + "_owner");
- Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
- createMockProxyClient(address, serverInfo);
- builder.provideProxyClient(address, mockProxyClient.getLeft());
-
- final AtomicReference<ServerInfo> resultHolder = new AtomicReference<ServerInfo>(null);
- final CountDownLatch doneLatch = new CountDownLatch(1);
- ProxyListener listener = new ProxyListener() {
- @Override
- public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
- resultHolder.set(serverInfo);
- doneLatch.countDown();
- }
- @Override
- public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
- }
- };
-
- ProxyClientManager clientManager = createProxyClientManager(builder, 0L);
- clientManager.registerProxyListener(listener);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- clientManager.createClient(address);
- assertEquals("Create client should build the proxy client",
- 1, clientManager.getNumProxies());
-
- // When a client is created, it would handshake with that proxy
- doneLatch.await();
- assertEquals("Handshake should return server info",
- serverInfo, resultHolder.get());
- }
-
- @Test(timeout = 60000)
- public void testHandshake() throws Exception {
- final int numHosts = 3;
- final int numStreamsPerHost = 3;
- final int initialPort = 4000;
-
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- Map<SocketAddress, ServerInfo> serverInfoMap =
- new HashMap<SocketAddress, ServerInfo>();
- for (int i = 0; i < numHosts; i++) {
- SocketAddress address = createSocketAddress(initialPort + i);
- ServerInfo serverInfo = new ServerInfo();
- for (int j = 0; j < numStreamsPerHost; j++) {
- serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
- address.toString());
- }
- Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
- createMockProxyClient(address, serverInfo);
- builder.provideProxyClient(address, mockProxyClient.getLeft());
- serverInfoMap.put(address, serverInfo);
- }
-
- final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
- final CountDownLatch doneLatch = new CountDownLatch(2 * numHosts);
- ProxyListener listener = new ProxyListener() {
- @Override
- public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
- synchronized (results) {
- results.put(address, serverInfo);
- }
- doneLatch.countDown();
- }
-
- @Override
- public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
- }
- };
-
- TestHostProvider rs = new TestHostProvider();
- ProxyClientManager clientManager = createProxyClientManager(builder, rs, 0L);
- clientManager.registerProxyListener(listener);
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- for (int i = 0; i < numHosts; i++) {
- rs.addHost(createSocketAddress(initialPort + i));
- }
- // handshake would handshake with 3 hosts again
- clientManager.handshake();
- doneLatch.await();
- assertEquals("Handshake should return server info",
- numHosts, results.size());
- assertTrue("Handshake should get all server infos",
- Maps.difference(serverInfoMap, results).areEqual());
- }
-
- @Test(timeout = 60000)
- public void testPeriodicHandshake() throws Exception {
- final int numHosts = 3;
- final int numStreamsPerHost = 3;
- final int initialPort = 5000;
-
- MockProxyClientBuilder builder = new MockProxyClientBuilder();
- Map<SocketAddress, ServerInfo> serverInfoMap =
- new HashMap<SocketAddress, ServerInfo>();
- Map<SocketAddress, MockServerInfoService> mockServiceMap =
- new HashMap<SocketAddress, MockServerInfoService>();
- final Map<SocketAddress, CountDownLatch> hostDoneLatches =
- new HashMap<SocketAddress, CountDownLatch>();
- for (int i = 0; i < numHosts; i++) {
- SocketAddress address = createSocketAddress(initialPort + i);
- ServerInfo serverInfo = new ServerInfo();
- for (int j = 0; j < numStreamsPerHost; j++) {
- serverInfo.putToOwnerships(runtime.getMethodName() + "_stream_" + j,
- address.toString());
- }
- Pair<MockProxyClient, MockServerInfoService> mockProxyClient =
- createMockProxyClient(address, serverInfo);
- builder.provideProxyClient(address, mockProxyClient.getLeft());
- serverInfoMap.put(address, serverInfo);
- mockServiceMap.put(address, mockProxyClient.getRight());
- hostDoneLatches.put(address, new CountDownLatch(2));
- }
-
- final Map<SocketAddress, ServerInfo> results = new HashMap<SocketAddress, ServerInfo>();
- final CountDownLatch doneLatch = new CountDownLatch(numHosts);
- ProxyListener listener = new ProxyListener() {
- @Override
- public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
- synchronized (results) {
- results.put(address, serverInfo);
- CountDownLatch latch = hostDoneLatches.get(address);
- if (null != latch) {
- latch.countDown();
- }
- }
- doneLatch.countDown();
- }
-
- @Override
- public void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause) {
- }
- };
-
- TestHostProvider rs = new TestHostProvider();
- ProxyClientManager clientManager = createProxyClientManager(builder, rs, 50L);
- clientManager.setPeriodicHandshakeEnabled(false);
- clientManager.registerProxyListener(listener);
-
- assertEquals("There should be no clients in the manager",
- 0, clientManager.getNumProxies());
- for (int i = 0; i < numHosts; i++) {
- SocketAddress address = createSocketAddress(initialPort + i);
- rs.addHost(address);
- clientManager.createClient(address);
- }
-
- // make sure the first 3 handshakes going through
- doneLatch.await();
-
- assertEquals("Handshake should return server info",
- numHosts, results.size());
- assertTrue("Handshake should get all server infos",
- Maps.difference(serverInfoMap, results).areEqual());
-
- // update server info
- for (int i = 0; i < numHosts; i++) {
- SocketAddress address = createSocketAddress(initialPort + i);
- ServerInfo serverInfo = new ServerInfo();
- for (int j = 0; j < numStreamsPerHost; j++) {
- serverInfo.putToOwnerships(runtime.getMethodName() + "_new_stream_" + j,
- address.toString());
- }
- MockServerInfoService service = mockServiceMap.get(address);
- serverInfoMap.put(address, serverInfo);
- service.updateServerInfo(serverInfo);
- }
-
- clientManager.setPeriodicHandshakeEnabled(true);
- for (int i = 0; i < numHosts; i++) {
- SocketAddress address = createSocketAddress(initialPort + i);
- CountDownLatch latch = hostDoneLatches.get(address);
- latch.await();
- }
-
- assertTrue("Periodic handshake should update all server infos",
- Maps.difference(serverInfoMap, results).areEqual());
- }
-
-}