You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@distributedlog.apache.org by si...@apache.org on 2017/01/05 00:51:51 UTC
[46/51] [partial] incubator-distributedlog git commit: DL-4:
Repackage the source under apache namespace
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
deleted file mode 100644
index d131e28..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClient.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.distributedlog.client.ClientConfig;
-import com.twitter.distributedlog.client.stats.ClientStats;
-import com.twitter.distributedlog.thrift.service.DistributedLogService;
-import com.twitter.finagle.Service;
-import com.twitter.finagle.ThriftMux;
-import com.twitter.finagle.builder.ClientBuilder;
-import com.twitter.finagle.thrift.ClientId;
-import com.twitter.finagle.thrift.ThriftClientFramedCodec;
-import com.twitter.finagle.thrift.ThriftClientRequest;
-import com.twitter.util.Duration;
-import com.twitter.util.Future;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import scala.Option;
-import scala.runtime.BoxedUnit;
-
-/**
- * Client talks to a single proxy.
- */
-public class ProxyClient {
-
- /**
- * Builder to build a proxy client talking to given host <code>address</code>.
- */
- public interface Builder {
- /**
- * Build a proxy client to <code>address</code>.
- *
- * @param address
- * proxy address
- * @return proxy client
- */
- ProxyClient build(SocketAddress address);
- }
-
- public static Builder newBuilder(String clientName,
- ClientId clientId,
- ClientBuilder clientBuilder,
- ClientConfig clientConfig,
- ClientStats clientStats) {
- return new DefaultBuilder(clientName, clientId, clientBuilder, clientConfig, clientStats);
- }
-
- /**
- * Default Builder for {@link ProxyClient}.
- */
- public static class DefaultBuilder implements Builder {
-
- private final String clientName;
- private final ClientId clientId;
- private final ClientBuilder clientBuilder;
- private final ClientStats clientStats;
-
- private DefaultBuilder(String clientName,
- ClientId clientId,
- ClientBuilder clientBuilder,
- ClientConfig clientConfig,
- ClientStats clientStats) {
- this.clientName = clientName;
- this.clientId = clientId;
- this.clientStats = clientStats;
- // client builder
- ClientBuilder builder = setDefaultSettings(
- null == clientBuilder ? getDefaultClientBuilder(clientConfig) : clientBuilder);
- this.clientBuilder = configureThriftMux(builder, clientId, clientConfig);
- }
-
- @SuppressWarnings("unchecked")
- private ClientBuilder configureThriftMux(ClientBuilder builder,
- ClientId clientId,
- ClientConfig clientConfig) {
- if (clientConfig.getThriftMux()) {
- return builder.stack(ThriftMux.client().withClientId(clientId));
- } else {
- return builder.codec(ThriftClientFramedCodec.apply(Option.apply(clientId)));
- }
- }
-
- private ClientBuilder getDefaultClientBuilder(ClientConfig clientConfig) {
- ClientBuilder builder = ClientBuilder.get()
- .tcpConnectTimeout(Duration.fromMilliseconds(200))
- .connectTimeout(Duration.fromMilliseconds(200))
- .requestTimeout(Duration.fromSeconds(1));
- if (!clientConfig.getThriftMux()) {
- builder = builder.hostConnectionLimit(1);
- }
- return builder;
- }
-
- @SuppressWarnings("unchecked")
- private ClientBuilder setDefaultSettings(ClientBuilder builder) {
- return builder.name(clientName)
- .failFast(false)
- .noFailureAccrual()
- // disable retries on finagle client builder, as there is only one host per finagle client
- // we should throw exception immediately on first failure, so DL client could quickly detect
- // failures and retry other proxies.
- .retries(1)
- .keepAlive(true);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public ProxyClient build(SocketAddress address) {
- Service<ThriftClientRequest, byte[]> client =
- ClientBuilder.safeBuildFactory(
- clientBuilder
- .hosts((InetSocketAddress) address)
- .reportTo(clientStats.getFinagleStatsReceiver(address))
- ).toService();
- DistributedLogService.ServiceIface service =
- new DistributedLogService.ServiceToClient(client, new TBinaryProtocol.Factory());
- return new ProxyClient(address, client, service);
- }
-
- }
-
- private final SocketAddress address;
- private final Service<ThriftClientRequest, byte[]> client;
- private final DistributedLogService.ServiceIface service;
-
- protected ProxyClient(SocketAddress address,
- Service<ThriftClientRequest, byte[]> client,
- DistributedLogService.ServiceIface service) {
- this.address = address;
- this.client = client;
- this.service = service;
- }
-
- public SocketAddress getAddress() {
- return address;
- }
-
- public Service<ThriftClientRequest, byte[]> getClient() {
- return client;
- }
-
- public DistributedLogService.ServiceIface getService() {
- return service;
- }
-
- public Future<BoxedUnit> close() {
- return client.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java
deleted file mode 100644
index c7d56f6..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyClientManager.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableMap;
-import com.twitter.distributedlog.client.ClientConfig;
-import com.twitter.distributedlog.client.stats.ClientStats;
-import com.twitter.distributedlog.client.stats.OpStats;
-import com.twitter.distributedlog.thrift.service.ClientInfo;
-import com.twitter.distributedlog.thrift.service.ServerInfo;
-import com.twitter.util.FutureEventListener;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Manager manages clients (channels) to proxies.
- */
-public class ProxyClientManager implements TimerTask {
-
- private static final Logger logger = LoggerFactory.getLogger(ProxyClientManager.class);
-
- private final ClientConfig clientConfig;
- private final ProxyClient.Builder clientBuilder;
- private final HashedWheelTimer timer;
- private final HostProvider hostProvider;
- private volatile Timeout periodicHandshakeTask;
- private final ConcurrentHashMap<SocketAddress, ProxyClient> address2Services =
- new ConcurrentHashMap<SocketAddress, ProxyClient>();
- private final CopyOnWriteArraySet<ProxyListener> proxyListeners =
- new CopyOnWriteArraySet<ProxyListener>();
- private volatile boolean closed = false;
- private volatile boolean periodicHandshakeEnabled = true;
- private final Stopwatch lastOwnershipSyncStopwatch;
-
- private final OpStats handshakeStats;
-
- public ProxyClientManager(ClientConfig clientConfig,
- ProxyClient.Builder clientBuilder,
- HashedWheelTimer timer,
- HostProvider hostProvider,
- ClientStats clientStats) {
- this.clientConfig = clientConfig;
- this.clientBuilder = clientBuilder;
- this.timer = timer;
- this.hostProvider = hostProvider;
- this.handshakeStats = clientStats.getOpStats("handshake");
- scheduleHandshake();
- this.lastOwnershipSyncStopwatch = Stopwatch.createStarted();
- }
-
- private void scheduleHandshake() {
- if (clientConfig.getPeriodicHandshakeIntervalMs() > 0) {
- periodicHandshakeTask = timer.newTimeout(this,
- clientConfig.getPeriodicHandshakeIntervalMs(), TimeUnit.MILLISECONDS);
- }
- }
-
- void setPeriodicHandshakeEnabled(boolean enabled) {
- this.periodicHandshakeEnabled = enabled;
- }
-
- @Override
- public void run(Timeout timeout) throws Exception {
- if (timeout.isCancelled() || closed) {
- return;
- }
- if (periodicHandshakeEnabled) {
- final boolean syncOwnerships = lastOwnershipSyncStopwatch.elapsed(TimeUnit.MILLISECONDS)
- >= clientConfig.getPeriodicOwnershipSyncIntervalMs();
-
- final Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
- final AtomicInteger numHosts = new AtomicInteger(hostsSnapshot.size());
- final AtomicInteger numStreams = new AtomicInteger(0);
- final AtomicInteger numSuccesses = new AtomicInteger(0);
- final AtomicInteger numFailures = new AtomicInteger(0);
- final ConcurrentMap<SocketAddress, Integer> streamDistributions =
- new ConcurrentHashMap<SocketAddress, Integer>();
- final Stopwatch stopwatch = Stopwatch.createStarted();
- for (SocketAddress host : hostsSnapshot) {
- final SocketAddress address = host;
- final ProxyClient client = getClient(address);
- handshake(address, client, new FutureEventListener<ServerInfo>() {
- @Override
- public void onSuccess(ServerInfo serverInfo) {
- numStreams.addAndGet(serverInfo.getOwnershipsSize());
- numSuccesses.incrementAndGet();
- notifyHandshakeSuccess(address, client, serverInfo, false, stopwatch);
- if (clientConfig.isHandshakeTracingEnabled()) {
- streamDistributions.putIfAbsent(address, serverInfo.getOwnershipsSize());
- }
- complete();
- }
-
- @Override
- public void onFailure(Throwable cause) {
- numFailures.incrementAndGet();
- notifyHandshakeFailure(address, client, cause, stopwatch);
- complete();
- }
-
- private void complete() {
- if (0 == numHosts.decrementAndGet()) {
- if (syncOwnerships) {
- logger.info("Periodic handshaked with {} hosts : {} streams returned,"
- + " {} hosts succeeded, {} hosts failed",
- new Object[] {
- hostsSnapshot.size(),
- numStreams.get(),
- numSuccesses.get(),
- numFailures.get()});
- if (clientConfig.isHandshakeTracingEnabled()) {
- logger.info("Periodic handshaked stream distribution : {}", streamDistributions);
- }
- }
- }
- }
- }, false, syncOwnerships);
- }
-
- if (syncOwnerships) {
- lastOwnershipSyncStopwatch.reset().start();
- }
- }
- scheduleHandshake();
- }
-
- /**
- * Register a proxy <code>listener</code> on proxy related changes.
- *
- * @param listener
- * proxy listener
- */
- public void registerProxyListener(ProxyListener listener) {
- proxyListeners.add(listener);
- }
-
- private void notifyHandshakeSuccess(SocketAddress address,
- ProxyClient client,
- ServerInfo serverInfo,
- boolean logging,
- Stopwatch stopwatch) {
- if (logging) {
- if (null != serverInfo && serverInfo.isSetOwnerships()) {
- logger.info("Handshaked with {} : {} ownerships returned.",
- address, serverInfo.getOwnerships().size());
- } else {
- logger.info("Handshaked with {} : no ownerships returned", address);
- }
- }
- handshakeStats.completeRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
- for (ProxyListener listener : proxyListeners) {
- listener.onHandshakeSuccess(address, client, serverInfo);
- }
- }
-
- private void notifyHandshakeFailure(SocketAddress address,
- ProxyClient client,
- Throwable cause,
- Stopwatch stopwatch) {
- handshakeStats.failRequest(address, stopwatch.elapsed(TimeUnit.MICROSECONDS), 1);
- for (ProxyListener listener : proxyListeners) {
- listener.onHandshakeFailure(address, client, cause);
- }
- }
-
- /**
- * Retrieve a client to proxy <code>address</code>.
- *
- * @param address
- * proxy address
- * @return proxy client
- */
- public ProxyClient getClient(final SocketAddress address) {
- ProxyClient sc = address2Services.get(address);
- if (null != sc) {
- return sc;
- }
- return createClient(address);
- }
-
- /**
- * Remove the client to proxy <code>address</code>.
- *
- * @param address
- * proxy address
- */
- public void removeClient(SocketAddress address) {
- ProxyClient sc = address2Services.remove(address);
- if (null != sc) {
- logger.info("Removed host {}.", address);
- sc.close();
- }
- }
-
- /**
- * Remove the client <code>sc</code> to proxy <code>address</code>.
- *
- * @param address
- * proxy address
- * @param sc
- * proxy client
- */
- public void removeClient(SocketAddress address, ProxyClient sc) {
- if (address2Services.remove(address, sc)) {
- logger.info("Remove client {} to host {}.", sc, address);
- sc.close();
- }
- }
-
- /**
- * Create a client to proxy <code>address</code>.
- *
- * @param address
- * proxy address
- * @return proxy client
- */
- public ProxyClient createClient(final SocketAddress address) {
- final ProxyClient sc = clientBuilder.build(address);
- ProxyClient oldSC = address2Services.putIfAbsent(address, sc);
- if (null != oldSC) {
- sc.close();
- return oldSC;
- } else {
- final Stopwatch stopwatch = Stopwatch.createStarted();
- FutureEventListener<ServerInfo> listener = new FutureEventListener<ServerInfo>() {
- @Override
- public void onSuccess(ServerInfo serverInfo) {
- notifyHandshakeSuccess(address, sc, serverInfo, true, stopwatch);
- }
- @Override
- public void onFailure(Throwable cause) {
- notifyHandshakeFailure(address, sc, cause, stopwatch);
- }
- };
- // send a ping messaging after creating connections.
- handshake(address, sc, listener, true, true);
- return sc;
- }
- }
-
- /**
- * Handshake with a given proxy.
- *
- * @param address
- * proxy address
- * @param sc
- * proxy client
- * @param listener
- * listener on handshake result
- */
- private void handshake(SocketAddress address,
- ProxyClient sc,
- FutureEventListener<ServerInfo> listener,
- boolean logging,
- boolean getOwnerships) {
- if (clientConfig.getHandshakeWithClientInfo()) {
- ClientInfo clientInfo = new ClientInfo();
- clientInfo.setGetOwnerships(getOwnerships);
- clientInfo.setStreamNameRegex(clientConfig.getStreamNameRegex());
- if (logging) {
- logger.info("Handshaking with {} : {}", address, clientInfo);
- }
- sc.getService().handshakeWithClientInfo(clientInfo)
- .addEventListener(listener);
- } else {
- if (logging) {
- logger.info("Handshaking with {}", address);
- }
- sc.getService().handshake().addEventListener(listener);
- }
- }
-
- /**
- * Handshake with all proxies.
- *
- * <p>NOTE: this is a synchronous call.
- */
- public void handshake() {
- Set<SocketAddress> hostsSnapshot = hostProvider.getHosts();
- logger.info("Handshaking with {} hosts.", hostsSnapshot.size());
- final CountDownLatch latch = new CountDownLatch(hostsSnapshot.size());
- final Stopwatch stopwatch = Stopwatch.createStarted();
- for (SocketAddress host: hostsSnapshot) {
- final SocketAddress address = host;
- final ProxyClient client = getClient(address);
- handshake(address, client, new FutureEventListener<ServerInfo>() {
- @Override
- public void onSuccess(ServerInfo serverInfo) {
- notifyHandshakeSuccess(address, client, serverInfo, true, stopwatch);
- latch.countDown();
- }
- @Override
- public void onFailure(Throwable cause) {
- notifyHandshakeFailure(address, client, cause, stopwatch);
- latch.countDown();
- }
- }, true, true);
- }
- try {
- latch.await(1, TimeUnit.MINUTES);
- } catch (InterruptedException e) {
- logger.warn("Interrupted on handshaking with servers : ", e);
- }
- }
-
- /**
- * Return number of proxies managed by client manager.
- *
- * @return number of proxies managed by client manager.
- */
- public int getNumProxies() {
- return address2Services.size();
- }
-
- /**
- * Return all clients.
- *
- * @return all clients.
- */
- public Map<SocketAddress, ProxyClient> getAllClients() {
- return ImmutableMap.copyOf(address2Services);
- }
-
- public void close() {
- closed = true;
- Timeout task = periodicHandshakeTask;
- if (null != task) {
- task.cancel();
- }
- for (ProxyClient sc : address2Services.values()) {
- sc.close();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java
deleted file mode 100644
index e024825..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/ProxyListener.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.proxy;
-
-import com.twitter.distributedlog.thrift.service.ServerInfo;
-import java.net.SocketAddress;
-
-/**
- * Listener on server changes.
- */
-public interface ProxyListener {
- /**
- * When a proxy's server info changed, it would be notified.
- *
- * @param address
- * proxy address
- * @param client
- * proxy client that executes handshaking
- * @param serverInfo
- * proxy's server info
- */
- void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo);
-
- /**
- * Failed to handshake with a proxy.
- *
- * @param address
- * proxy address
- * @param client
- * proxy client
- * @param cause
- * failure reason
- */
- void onHandshakeFailure(SocketAddress address, ProxyClient client, Throwable cause);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java
deleted file mode 100644
index dc28c76..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/proxy/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Clients that interact with individual proxies.
- */
-package com.twitter.distributedlog.client.proxy;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java
deleted file mode 100644
index ab2fbed..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/DefaultRegionResolver.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.resolver;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * Default implementation of {@link RegionResolver}.
- */
-public class DefaultRegionResolver implements RegionResolver {
-
- private static final String DEFAULT_REGION = "default-region";
-
- private final Map<SocketAddress, String> regionOverrides =
- new HashMap<SocketAddress, String>();
- private final ConcurrentMap<SocketAddress, String> regionMap =
- new ConcurrentHashMap<SocketAddress, String>();
-
- public DefaultRegionResolver() {
- }
-
- public DefaultRegionResolver(Map<SocketAddress, String> regionOverrides) {
- this.regionOverrides.putAll(regionOverrides);
- }
-
- @Override
- public String resolveRegion(SocketAddress address) {
- String region = regionMap.get(address);
- if (null == region) {
- region = doResolveRegion(address);
- regionMap.put(address, region);
- }
- return region;
- }
-
- private String doResolveRegion(SocketAddress address) {
- String region = regionOverrides.get(address);
- if (null != region) {
- return region;
- }
-
- String domainName;
- if (address instanceof InetSocketAddress) {
- InetSocketAddress iAddr = (InetSocketAddress) address;
- domainName = iAddr.getHostName();
- } else {
- domainName = address.toString();
- }
- String[] parts = domainName.split("\\.");
- if (parts.length <= 0) {
- return DEFAULT_REGION;
- }
- String hostName = parts[0];
- String[] labels = hostName.split("-");
- if (labels.length != 4) {
- return DEFAULT_REGION;
- }
- return labels[0];
- }
-
- @Override
- public void removeCachedHost(SocketAddress address) {
- regionMap.remove(address);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/RegionResolver.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/RegionResolver.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/RegionResolver.java
deleted file mode 100644
index eff3aad..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/RegionResolver.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.resolver;
-
-import java.net.SocketAddress;
-
-/**
- * Resolve address to region.
- */
-public interface RegionResolver {
-
- /**
- * Resolve address to region.
- *
- * @param address
- * socket address
- * @return region
- */
- String resolveRegion(SocketAddress address);
-
- /**
- * Remove cached host.
- *
- * @param address
- * socket address.
- */
- void removeCachedHost(SocketAddress address);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java
deleted file mode 100644
index 4bb53a5..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/resolver/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-/**
- * Resolver to resolve network addresses.
- */
-package com.twitter.distributedlog.client.resolver;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
deleted file mode 100644
index 6d1e37e..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/ConsistentHashRoutingService.java
+++ /dev/null
@@ -1,500 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.MapDifference;
-import com.google.common.collect.Maps;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.distributedlog.service.DLSocketAddress;
-import com.twitter.finagle.ChannelException;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.Counter;
-import com.twitter.finagle.stats.Gauge;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import com.twitter.util.Function0;
-import java.net.SocketAddress;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.commons.lang3.tuple.Pair;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.Timeout;
-import org.jboss.netty.util.TimerTask;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.collection.Seq;
-
-/**
- * Consistent Hashing Based {@link RoutingService}.
- */
-public class ConsistentHashRoutingService extends ServerSetRoutingService {
-
- private static final Logger logger = LoggerFactory.getLogger(ConsistentHashRoutingService.class);
-
- @Deprecated
- public static ConsistentHashRoutingService of(ServerSetWatcher serverSetWatcher, int numReplicas) {
- return new ConsistentHashRoutingService(serverSetWatcher, numReplicas, 300, NullStatsReceiver.get());
- }
-
- /**
- * Builder helper class to build a consistent hash bashed {@link RoutingService}.
- *
- * @return builder to build a consistent hash based {@link RoutingService}.
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder for building consistent hash based routing service.
- */
- public static class Builder implements RoutingService.Builder {
-
- private ServerSet serverSet;
- private boolean resolveFromName = false;
- private int numReplicas;
- private int blackoutSeconds = 300;
- private StatsReceiver statsReceiver = NullStatsReceiver.get();
-
- private Builder() {}
-
- public Builder serverSet(ServerSet serverSet) {
- this.serverSet = serverSet;
- return this;
- }
-
- public Builder resolveFromName(boolean enabled) {
- this.resolveFromName = enabled;
- return this;
- }
-
- public Builder numReplicas(int numReplicas) {
- this.numReplicas = numReplicas;
- return this;
- }
-
- public Builder blackoutSeconds(int seconds) {
- this.blackoutSeconds = seconds;
- return this;
- }
-
- public Builder statsReceiver(StatsReceiver statsReceiver) {
- this.statsReceiver = statsReceiver;
- return this;
- }
-
- @Override
- public RoutingService build() {
- checkNotNull(serverSet, "No serverset provided.");
- checkNotNull(statsReceiver, "No stats receiver provided.");
- checkArgument(numReplicas > 0, "Invalid number of replicas : " + numReplicas);
- return new ConsistentHashRoutingService(new TwitterServerSetWatcher(serverSet, resolveFromName),
- numReplicas, blackoutSeconds, statsReceiver);
- }
- }
-
- static class ConsistentHash {
- private final HashFunction hashFunction;
- private final int numOfReplicas;
- private final SortedMap<Long, SocketAddress> circle;
-
- // Stats
- protected final Counter hostAddedCounter;
- protected final Counter hostRemovedCounter;
-
- ConsistentHash(HashFunction hashFunction,
- int numOfReplicas,
- StatsReceiver statsReceiver) {
- this.hashFunction = hashFunction;
- this.numOfReplicas = numOfReplicas;
- this.circle = new TreeMap<Long, SocketAddress>();
-
- this.hostAddedCounter = statsReceiver.counter0("adds");
- this.hostRemovedCounter = statsReceiver.counter0("removes");
- }
-
- private String replicaName(int shardId, int replica, String address) {
- if (shardId < 0) {
- shardId = UNKNOWN_SHARD_ID;
- }
-
- StringBuilder sb = new StringBuilder(100);
- sb.append("shard-");
- sb.append(shardId);
- sb.append('-');
- sb.append(replica);
- sb.append('-');
- sb.append(address);
-
- return sb.toString();
- }
-
- private Long replicaHash(int shardId, int replica, String address) {
- return hashFunction.hashUnencodedChars(replicaName(shardId, replica, address)).asLong();
- }
-
- private Long replicaHash(int shardId, int replica, SocketAddress address) {
- return replicaHash(shardId, replica, address.toString());
- }
-
- public synchronized void add(int shardId, SocketAddress address) {
- String addressStr = address.toString();
- for (int i = 0; i < numOfReplicas; i++) {
- Long hash = replicaHash(shardId, i, addressStr);
- circle.put(hash, address);
- }
- hostAddedCounter.incr();
- }
-
- public synchronized void remove(int shardId, SocketAddress address) {
- for (int i = 0; i < numOfReplicas; i++) {
- long hash = replicaHash(shardId, i, address);
- SocketAddress oldAddress = circle.get(hash);
- if (null != oldAddress && oldAddress.equals(address)) {
- circle.remove(hash);
- }
- }
- hostRemovedCounter.incr();
- }
-
- public SocketAddress get(String key, RoutingContext rContext) {
- long hash = hashFunction.hashUnencodedChars(key).asLong();
- return find(hash, rContext);
- }
-
- private synchronized SocketAddress find(long hash, RoutingContext rContext) {
- if (circle.isEmpty()) {
- return null;
- }
-
- Iterator<Map.Entry<Long, SocketAddress>> iterator =
- circle.tailMap(hash).entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, SocketAddress> entry = iterator.next();
- if (!rContext.isTriedHost(entry.getValue())) {
- return entry.getValue();
- }
- }
- // the tail map has been checked
- iterator = circle.headMap(hash).entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, SocketAddress> entry = iterator.next();
- if (!rContext.isTriedHost(entry.getValue())) {
- return entry.getValue();
- }
- }
-
- return null;
- }
-
- private synchronized Pair<Long, SocketAddress> get(long hash) {
- if (circle.isEmpty()) {
- return null;
- }
-
- if (!circle.containsKey(hash)) {
- SortedMap<Long, SocketAddress> tailMap = circle.tailMap(hash);
- hash = tailMap.isEmpty() ? circle.firstKey() : tailMap.firstKey();
- }
- return Pair.of(hash, circle.get(hash));
- }
-
- synchronized void dumpHashRing() {
- for (Map.Entry<Long, SocketAddress> entry : circle.entrySet()) {
- logger.info(entry.getKey() + " : " + entry.getValue());
- }
- }
-
- }
-
- class BlackoutHost implements TimerTask {
- final int shardId;
- final SocketAddress address;
-
- BlackoutHost(int shardId, SocketAddress address) {
- this.shardId = shardId;
- this.address = address;
- numBlackoutHosts.incrementAndGet();
- }
-
- @Override
- public void run(Timeout timeout) throws Exception {
- numBlackoutHosts.decrementAndGet();
- if (!timeout.isExpired()) {
- return;
- }
- Set<SocketAddress> removedList = new HashSet<SocketAddress>();
- boolean joined;
- // add the shard back
- synchronized (shardId2Address) {
- SocketAddress curHost = shardId2Address.get(shardId);
- if (null != curHost) {
- // there is already new shard joint, so drop the host.
- logger.info("Blackout Shard {} ({}) was already replaced by {} permanently.",
- new Object[] { shardId, address, curHost });
- joined = false;
- } else {
- join(shardId, address, removedList);
- joined = true;
- }
- }
- if (joined) {
- for (RoutingListener listener : listeners) {
- listener.onServerJoin(address);
- }
- } else {
- for (RoutingListener listener : listeners) {
- listener.onServerLeft(address);
- }
- }
- }
- }
-
- protected final HashedWheelTimer hashedWheelTimer;
- protected final HashFunction hashFunction = Hashing.md5();
- protected final ConsistentHash circle;
- protected final Map<Integer, SocketAddress> shardId2Address =
- new HashMap<Integer, SocketAddress>();
- protected final Map<SocketAddress, Integer> address2ShardId =
- new HashMap<SocketAddress, Integer>();
-
- // blackout period
- protected final int blackoutSeconds;
-
- // stats
- protected final StatsReceiver statsReceiver;
- protected final AtomicInteger numBlackoutHosts;
- protected final Gauge numBlackoutHostsGauge;
- protected final Gauge numHostsGauge;
-
- private static final int UNKNOWN_SHARD_ID = -1;
-
- ConsistentHashRoutingService(ServerSetWatcher serverSetWatcher,
- int numReplicas,
- int blackoutSeconds,
- StatsReceiver statsReceiver) {
- super(serverSetWatcher);
- this.circle = new ConsistentHash(hashFunction, numReplicas, statsReceiver.scope("ring"));
- this.hashedWheelTimer = new HashedWheelTimer(new ThreadFactoryBuilder()
- .setNameFormat("ConsistentHashRoutingService-Timer-%d").build());
- this.blackoutSeconds = blackoutSeconds;
- // stats
- this.statsReceiver = statsReceiver;
- this.numBlackoutHosts = new AtomicInteger(0);
- this.numBlackoutHostsGauge = this.statsReceiver.addGauge(gaugeName("num_blackout_hosts"),
- new Function0<Object>() {
- @Override
- public Object apply() {
- return (float) numBlackoutHosts.get();
- }
- });
- this.numHostsGauge = this.statsReceiver.addGauge(gaugeName("num_hosts"),
- new Function0<Object>() {
- @Override
- public Object apply() {
- return (float) address2ShardId.size();
- }
- });
- }
-
- private static Seq<String> gaugeName(String name) {
- return scala.collection.JavaConversions.asScalaBuffer(Arrays.asList(name)).toList();
- }
-
- @Override
- public void startService() {
- super.startService();
- this.hashedWheelTimer.start();
- }
-
- @Override
- public void stopService() {
- this.hashedWheelTimer.stop();
- super.stopService();
- }
-
- @Override
- public Set<SocketAddress> getHosts() {
- synchronized (shardId2Address) {
- return ImmutableSet.copyOf(address2ShardId.keySet());
- }
- }
-
- @Override
- public SocketAddress getHost(String key, RoutingContext rContext)
- throws NoBrokersAvailableException {
- SocketAddress host = circle.get(key, rContext);
- if (null != host) {
- return host;
- }
- throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + rContext);
- }
-
- @Override
- public void removeHost(SocketAddress host, Throwable reason) {
- removeHostInternal(host, Optional.of(reason));
- }
-
- private void removeHostInternal(SocketAddress host, Optional<Throwable> reason) {
- synchronized (shardId2Address) {
- Integer shardId = address2ShardId.remove(host);
- if (null != shardId) {
- SocketAddress curHost = shardId2Address.get(shardId);
- if (null != curHost && curHost.equals(host)) {
- shardId2Address.remove(shardId);
- }
- circle.remove(shardId, host);
- if (reason.isPresent()) {
- if (reason.get() instanceof ChannelException) {
- logger.info("Shard {} ({}) left due to ChannelException, black it out for {} seconds"
- + " (message = {})",
- new Object[] { shardId, host, blackoutSeconds, reason.get().toString() });
- BlackoutHost blackoutHost = new BlackoutHost(shardId, host);
- hashedWheelTimer.newTimeout(blackoutHost, blackoutSeconds, TimeUnit.SECONDS);
- } else {
- logger.info("Shard {} ({}) left due to exception {}",
- new Object[] { shardId, host, reason.get().toString() });
- }
- } else {
- logger.info("Shard {} ({}) left after server set change",
- shardId, host);
- }
- } else if (reason.isPresent()) {
- logger.info("Node {} left due to exception {}", host, reason.get().toString());
- } else {
- logger.info("Node {} left after server set change", host);
- }
- }
- }
-
- /**
- * The caller should synchronize on <i>shardId2Address</i>.
- * @param shardId
- * Shard id of new host joined.
- * @param newHost
- * New host joined.
- * @param removedList
- * Old hosts to remove
- */
- private void join(int shardId, SocketAddress newHost, Set<SocketAddress> removedList) {
- SocketAddress oldHost = shardId2Address.put(shardId, newHost);
- if (null != oldHost) {
- // remove the old host only when a new shard is kicked in to replace it.
- address2ShardId.remove(oldHost);
- circle.remove(shardId, oldHost);
- removedList.add(oldHost);
- logger.info("Shard {} ({}) left permanently.", shardId, oldHost);
- }
- address2ShardId.put(newHost, shardId);
- circle.add(shardId, newHost);
- logger.info("Shard {} ({}) joined to replace ({}).",
- new Object[] { shardId, newHost, oldHost });
- }
-
- @Override
- protected synchronized void performServerSetChange(ImmutableSet<DLSocketAddress> serviceInstances) {
- Set<SocketAddress> joinedList = new HashSet<SocketAddress>();
- Set<SocketAddress> removedList = new HashSet<SocketAddress>();
-
- Map<Integer, SocketAddress> newMap = new HashMap<Integer, SocketAddress>();
- synchronized (shardId2Address) {
- for (DLSocketAddress serviceInstance : serviceInstances) {
- if (serviceInstance.getShard() >= 0) {
- newMap.put(serviceInstance.getShard(), serviceInstance.getSocketAddress());
- } else {
- Integer shard = address2ShardId.get(serviceInstance.getSocketAddress());
- if (null == shard) {
- // Assign a random negative shardId
- int shardId;
- do {
- shardId = Math.min(-1 , (int) (Math.random() * Integer.MIN_VALUE));
- } while (null != shardId2Address.get(shardId));
- shard = shardId;
- }
- newMap.put(shard, serviceInstance.getSocketAddress());
- }
- }
- }
-
- Map<Integer, SocketAddress> left;
- synchronized (shardId2Address) {
- MapDifference<Integer, SocketAddress> difference =
- Maps.difference(shardId2Address, newMap);
- left = difference.entriesOnlyOnLeft();
- for (Map.Entry<Integer, SocketAddress> shardEntry : left.entrySet()) {
- int shard = shardEntry.getKey();
- if (shard >= 0) {
- SocketAddress host = shardId2Address.get(shard);
- if (null != host) {
- // we don't remove those hosts that just disappered on serverset proactively,
- // since it might be just because serverset become flaky
- // address2ShardId.remove(host);
- // circle.remove(shard, host);
- logger.info("Shard {} ({}) left temporarily.", shard, host);
- }
- } else {
- // shard id is negative - they are resolved from finagle name, which instances don't have shard id
- // in this case, if they are removed from serverset, we removed them directly
- SocketAddress host = shardEntry.getValue();
- if (null != host) {
- removeHostInternal(host, Optional.<Throwable>absent());
- removedList.add(host);
- }
- }
- }
- // we need to find if any shards are replacing old shards
- for (Map.Entry<Integer, SocketAddress> shard : newMap.entrySet()) {
- SocketAddress oldHost = shardId2Address.get(shard.getKey());
- SocketAddress newHost = shard.getValue();
- if (!newHost.equals(oldHost)) {
- join(shard.getKey(), newHost, removedList);
- joinedList.add(newHost);
- }
- }
- }
-
- for (SocketAddress addr : removedList) {
- for (RoutingListener listener : listeners) {
- listener.onServerLeft(addr);
- }
- }
-
- for (SocketAddress addr : joinedList) {
- for (RoutingListener listener : listeners) {
- listener.onServerJoin(addr);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java
deleted file mode 100644
index eeba4ac..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/NameServerSet.java
+++ /dev/null
@@ -1,263 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import com.google.common.collect.ImmutableSet;
-import com.twitter.common.base.Command;
-import com.twitter.common.base.Commands;
-import com.twitter.common.zookeeper.Group;
-import com.twitter.common.zookeeper.ServerSet;
-import com.twitter.finagle.Addr;
-import com.twitter.finagle.Address;
-import com.twitter.finagle.Name;
-import com.twitter.finagle.Resolver$;
-import com.twitter.thrift.Endpoint;
-import com.twitter.thrift.ServiceInstance;
-import com.twitter.thrift.Status;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.runtime.AbstractFunction1;
-import scala.runtime.BoxedUnit;
-
-/**
- * Finagle Name based {@link ServerSet} implementation.
- */
-class NameServerSet implements ServerSet {
-
- private static final Logger logger = LoggerFactory.getLogger(NameServerSet.class);
-
- private volatile Set<HostChangeMonitor<ServiceInstance>> watchers =
- new HashSet<HostChangeMonitor<ServiceInstance>>();
- private volatile ImmutableSet<ServiceInstance> hostSet = ImmutableSet.of();
- private AtomicBoolean resolutionPending = new AtomicBoolean(true);
-
- public NameServerSet(String nameStr) {
- Name name;
- try {
- name = Resolver$.MODULE$.eval(nameStr);
- } catch (Exception exc) {
- logger.error("Exception in Resolver.eval for name {}", nameStr, exc);
- // Since this is called from various places that dont handle specific exceptions,
- // we have no option than to throw a runtime exception to halt the control flow
- // This should only happen in case of incorrect configuration. Having a log message
- // would help identify the problem during tests
- throw new RuntimeException(exc);
- }
- initialize(name);
- }
-
- public NameServerSet(Name name) {
- initialize(name);
- }
-
- private void initialize(Name name) {
- if (name instanceof TestName) {
- ((TestName) name).changes(new AbstractFunction1<Addr, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Addr varAddr) {
- return NameServerSet.this.respondToChanges(varAddr);
- }
- });
- } else if (name instanceof Name.Bound) {
- ((Name.Bound) name).addr().changes().respond(new AbstractFunction1<Addr, BoxedUnit>() {
- @Override
- public BoxedUnit apply(Addr varAddr) {
- return NameServerSet.this.respondToChanges(varAddr);
- }
- });
- } else {
- logger.error("NameServerSet only supports Name.Bound. While the resolved name {} was {}",
- name, name.getClass());
- throw new UnsupportedOperationException("NameServerSet only supports Name.Bound");
- }
- }
-
- private ServiceInstance endpointAddressToServiceInstance(Address endpointAddress) {
- if (endpointAddress instanceof Address.Inet) {
- InetSocketAddress inetSocketAddress = ((Address.Inet) endpointAddress).addr();
- Endpoint endpoint = new Endpoint(inetSocketAddress.getHostString(), inetSocketAddress.getPort());
- HashMap<String, Endpoint> map = new HashMap<String, Endpoint>();
- map.put("thrift", endpoint);
- return new ServiceInstance(
- endpoint,
- map,
- Status.ALIVE);
- } else {
- logger.error("We expect InetSocketAddress while the resolved address {} was {}",
- endpointAddress, endpointAddress.getClass());
- throw new UnsupportedOperationException("invalid endpoint address: " + endpointAddress);
- }
- }
-
-
- private BoxedUnit respondToChanges(Addr addr) {
- ImmutableSet<ServiceInstance> oldHostSet = ImmutableSet.copyOf(hostSet);
-
- ImmutableSet<ServiceInstance> newHostSet = oldHostSet;
-
- if (addr instanceof Addr.Bound) {
- scala.collection.immutable.Set<Address> endpointAddresses = ((Addr.Bound) addr).addrs();
- scala.collection.Iterator<Address> endpointAddressesIterator = endpointAddresses.toIterator();
- HashSet<ServiceInstance> serviceInstances = new HashSet<ServiceInstance>();
- while (endpointAddressesIterator.hasNext()) {
- serviceInstances.add(endpointAddressToServiceInstance(endpointAddressesIterator.next()));
- }
- newHostSet = ImmutableSet.copyOf(serviceInstances);
-
- } else if (addr instanceof Addr.Failed) {
- logger.error("Name resolution failed", ((Addr.Failed) addr).cause());
- newHostSet = ImmutableSet.of();
- } else if (addr.toString().equals("Pending")) {
- logger.info("Name resolution pending");
- newHostSet = oldHostSet;
- } else if (addr.toString().equals("Neg")) {
- newHostSet = ImmutableSet.of();
- } else {
- logger.error("Invalid Addr type: {}", addr.getClass().getName());
- throw new UnsupportedOperationException("Invalid Addr type:" + addr.getClass().getName());
- }
-
- // Reference comparison is valid as the sets are immutable
- if (oldHostSet != newHostSet) {
- logger.info("NameServerSet updated: {} -> {}", hostSetToString(oldHostSet), hostSetToString(newHostSet));
- resolutionPending.set(false);
- hostSet = newHostSet;
- synchronized (watchers) {
- for (HostChangeMonitor<ServiceInstance> watcher: watchers) {
- watcher.onChange(newHostSet);
- }
- }
-
- }
-
- return BoxedUnit.UNIT;
- }
-
-
- private String hostSetToString(ImmutableSet<ServiceInstance> hostSet) {
- StringBuilder result = new StringBuilder();
- result.append("(");
- for (ServiceInstance serviceInstance : hostSet) {
- Endpoint endpoint = serviceInstance.getServiceEndpoint();
- result.append(String.format(" %s:%d", endpoint.getHost(), endpoint.getPort()));
- }
- result.append(" )");
-
- return result.toString();
- }
-
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @param status the current service status
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws Group.JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- * @deprecated The status field is deprecated. Please use {@link #join(java.net.InetSocketAddress, java.util.Map)}
- */
- @Override
- public EndpointStatus join(InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- Status status)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("NameServerSet does not support join");
- }
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws Group.JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- */
- @Override
- public EndpointStatus join(InetSocketAddress endpoint, Map<String, InetSocketAddress> additionalEndpoints)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("NameServerSet does not support join");
- }
-
- /**
- * Attempts to join a server set for this logical service group.
- *
- * @param endpoint the primary service endpoint
- * @param additionalEndpoints and additional endpoints keyed by their logical name
- * @param shardId Unique shard identifier for this member of the service.
- * @return an EndpointStatus object that allows the endpoint to adjust its status
- * @throws Group.JoinException if there was a problem joining the server set
- * @throws InterruptedException if interrupted while waiting to join the server set
- */
- @Override
- public EndpointStatus join(InetSocketAddress endpoint,
- Map<String, InetSocketAddress> additionalEndpoints,
- int shardId)
- throws Group.JoinException, InterruptedException {
- throw new UnsupportedOperationException("NameServerSet does not support join");
- }
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process
- * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
- * @deprecated Deprecated in favor of {@link #watch(com.twitter.common.net.pool.DynamicHostSet.HostChangeMonitor)}
- */
- @Deprecated
- @Override
- public void monitor(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- throw new UnsupportedOperationException("NameServerSet does not support monitor");
- }
-
- /**
- * Registers a monitor to receive change notices for this server set as long as this jvm process
- * is alive. Blocks until the initial server set can be gathered and delivered to the monitor.
- * The monitor will be notified if the membership set or parameters of existing members have
- * changed.
- *
- * @param monitor the server set monitor to call back when the host set changes
- * @return A command which, when executed, will stop monitoring the host set.
- * @throws com.twitter.common.net.pool.DynamicHostSet.MonitorException if there is a problem monitoring the host set
- */
- @Override
- public Command watch(HostChangeMonitor<ServiceInstance> monitor) throws MonitorException {
- // First add the monitor to the watchers so that it does not miss any changes and invoke
- // the onChange method
- synchronized (watchers) {
- watchers.add(monitor);
- }
-
- if (resolutionPending.compareAndSet(false, false)) {
- monitor.onChange(hostSet);
- }
-
- return Commands.NOOP; // Return value is not used
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java
deleted file mode 100644
index 4714270..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RegionsRoutingService.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Sets;
-import com.twitter.distributedlog.client.resolver.RegionResolver;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.NullStatsReceiver;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Chain multiple routing services.
- */
-public class RegionsRoutingService implements RoutingService {
-
- private static final Logger logger = LoggerFactory.getLogger(RegionsRoutingService.class);
-
- /**
- * Create a multiple regions routing services based on a list of region routing {@code services}.
- *
- * <p>It is deprecated. Please use {@link Builder} to build multiple regions routing service.
- *
- * @param regionResolver region resolver
- * @param services a list of region routing services.
- * @return multiple regions routing service
- * @see Builder
- */
- @Deprecated
- public static RegionsRoutingService of(RegionResolver regionResolver,
- RoutingService...services) {
- return new RegionsRoutingService(regionResolver, services);
- }
-
- /**
- * Create a builder to build a multiple-regions routing service.
- *
- * @return builder to build a multiple-regions routing service.
- */
- public static Builder newBuilder() {
- return new Builder();
- }
-
- /**
- * Builder to build a multiple-regions routing service.
- */
- public static class Builder implements RoutingService.Builder {
-
- private RegionResolver resolver;
- private RoutingService.Builder[] routingServiceBuilders;
- private StatsReceiver statsReceiver = NullStatsReceiver.get();
-
- private Builder() {}
-
- public Builder routingServiceBuilders(RoutingService.Builder...builders) {
- this.routingServiceBuilders = builders;
- return this;
- }
-
- public Builder resolver(RegionResolver regionResolver) {
- this.resolver = regionResolver;
- return this;
- }
-
- @Override
- public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
- this.statsReceiver = statsReceiver;
- return this;
- }
-
- @Override
- public RegionsRoutingService build() {
- checkNotNull(routingServiceBuilders, "No routing service builder provided.");
- checkNotNull(resolver, "No region resolver provided.");
- checkNotNull(statsReceiver, "No stats receiver provided");
- RoutingService[] services = new RoutingService[routingServiceBuilders.length];
- for (int i = 0; i < services.length; i++) {
- String statsScope;
- if (0 == i) {
- statsScope = "local";
- } else {
- statsScope = "remote_" + i;
- }
- services[i] = routingServiceBuilders[i]
- .statsReceiver(statsReceiver.scope(statsScope))
- .build();
- }
- return new RegionsRoutingService(resolver, services);
- }
- }
-
- protected final RegionResolver regionResolver;
- protected final RoutingService[] routingServices;
-
- private RegionsRoutingService(RegionResolver resolver,
- RoutingService[] routingServices) {
- this.regionResolver = resolver;
- this.routingServices = routingServices;
- }
-
- @Override
- public Set<SocketAddress> getHosts() {
- Set<SocketAddress> hosts = Sets.newHashSet();
- for (RoutingService rs : routingServices) {
- hosts.addAll(rs.getHosts());
- }
- return hosts;
- }
-
- @Override
- public void startService() {
- for (RoutingService service : routingServices) {
- service.startService();
- }
- logger.info("Regions Routing Service Started");
- }
-
- @Override
- public void stopService() {
- for (RoutingService service : routingServices) {
- service.stopService();
- }
- logger.info("Regions Routing Service Stopped");
- }
-
- @Override
- public RoutingService registerListener(RoutingListener listener) {
- for (RoutingService service : routingServices) {
- service.registerListener(listener);
- }
- return this;
- }
-
- @Override
- public RoutingService unregisterListener(RoutingListener listener) {
- for (RoutingService service : routingServices) {
- service.registerListener(listener);
- }
- return this;
- }
-
- @Override
- public SocketAddress getHost(String key, RoutingContext routingContext)
- throws NoBrokersAvailableException {
- for (RoutingService service : routingServices) {
- try {
- SocketAddress addr = service.getHost(key, routingContext);
- if (routingContext.hasUnavailableRegions()) {
- // current region is unavailable
- String region = regionResolver.resolveRegion(addr);
- if (routingContext.isUnavailableRegion(region)) {
- continue;
- }
- }
- if (!routingContext.isTriedHost(addr)) {
- return addr;
- }
- } catch (NoBrokersAvailableException nbae) {
- // if there isn't broker available in current service, try next service.
- logger.debug("No brokers available in region {} : ", service, nbae);
- }
- }
- throw new NoBrokersAvailableException("No host found for " + key + ", routing context : " + routingContext);
- }
-
- @Override
- public void removeHost(SocketAddress address, Throwable reason) {
- for (RoutingService service : routingServices) {
- service.removeHost(address, reason);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java
deleted file mode 100644
index 56446c1..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingService.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import com.twitter.distributedlog.client.resolver.RegionResolver;
-import com.twitter.distributedlog.thrift.service.StatusCode;
-import com.twitter.finagle.NoBrokersAvailableException;
-import com.twitter.finagle.stats.StatsReceiver;
-import java.net.SocketAddress;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Routing Service provides mechanism how to route requests.
- */
-public interface RoutingService {
-
- /**
- * Builder to build routing service.
- */
- interface Builder {
-
- /**
- * Build routing service with stats receiver.
- *
- * @param statsReceiver
- * stats receiver
- * @return built routing service
- */
- Builder statsReceiver(StatsReceiver statsReceiver);
-
- /**
- * Build the routing service.
- *
- * @return built routing service
- */
- RoutingService build();
-
- }
-
- /**
- * Listener for server changes on routing service.
- */
- interface RoutingListener {
- /**
- * Trigger when server left.
- *
- * @param address left server.
- */
- void onServerLeft(SocketAddress address);
-
- /**
- * Trigger when server joint.
- *
- * @param address joint server.
- */
- void onServerJoin(SocketAddress address);
- }
-
- /**
- * Routing Context of a request.
- */
- class RoutingContext {
-
- public static RoutingContext of(RegionResolver resolver) {
- return new RoutingContext(resolver);
- }
-
- final RegionResolver regionResolver;
- final Map<SocketAddress, StatusCode> triedHosts;
- final Set<String> unavailableRegions;
-
- private RoutingContext(RegionResolver regionResolver) {
- this.regionResolver = regionResolver;
- this.triedHosts = new HashMap<SocketAddress, StatusCode>();
- this.unavailableRegions = new HashSet<String>();
- }
-
- @Override
- public synchronized String toString() {
- return "(tried hosts=" + triedHosts + ")";
- }
-
- /**
- * Add tried host to routing context.
- *
- * @param socketAddress
- * socket address of tried host.
- * @param code
- * status code returned from tried host.
- * @return routing context.
- */
- public synchronized RoutingContext addTriedHost(SocketAddress socketAddress, StatusCode code) {
- this.triedHosts.put(socketAddress, code);
- if (StatusCode.REGION_UNAVAILABLE == code) {
- unavailableRegions.add(regionResolver.resolveRegion(socketAddress));
- }
- return this;
- }
-
- /**
- * Is the host <i>address</i> already tried.
- *
- * @param address
- * socket address to check
- * @return true if the address is already tried, otherwise false.
- */
- public synchronized boolean isTriedHost(SocketAddress address) {
- return this.triedHosts.containsKey(address);
- }
-
- /**
- * Whether encountered unavailable regions.
- *
- * @return true if encountered unavailable regions, otherwise false.
- */
- public synchronized boolean hasUnavailableRegions() {
- return !unavailableRegions.isEmpty();
- }
-
- /**
- * Whether the <i>region</i> is unavailable.
- *
- * @param region
- * region
- * @return true if the region is unavailable, otherwise false.
- */
- public synchronized boolean isUnavailableRegion(String region) {
- return unavailableRegions.contains(region);
- }
-
- }
-
- /**
- * Start routing service.
- */
- void startService();
-
- /**
- * Stop routing service.
- */
- void stopService();
-
- /**
- * Register routing listener.
- *
- * @param listener routing listener.
- * @return routing service.
- */
- RoutingService registerListener(RoutingListener listener);
-
- /**
- * Unregister routing listener.
- *
- * @param listener routing listener.
- * @return routing service.
- */
- RoutingService unregisterListener(RoutingListener listener);
-
- /**
- * Get all the hosts that available in routing service.
- *
- * @return all the hosts
- */
- Set<SocketAddress> getHosts();
-
- /**
- * Get the host to route the request by <i>key</i>.
- *
- * @param key
- * key to route the request.
- * @param rContext
- * routing context.
- * @return host to route the request
- * @throws NoBrokersAvailableException
- */
- SocketAddress getHost(String key, RoutingContext rContext)
- throws NoBrokersAvailableException;
-
- /**
- * Remove the host <i>address</i> for a specific <i>reason</i>.
- *
- * @param address
- * host address to remove
- * @param reason
- * reason to remove the host
- */
- void removeHost(SocketAddress address, Throwable reason);
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingServiceProvider.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingServiceProvider.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingServiceProvider.java
deleted file mode 100644
index 22cd222..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingServiceProvider.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import com.twitter.finagle.stats.StatsReceiver;
-
-class RoutingServiceProvider implements RoutingService.Builder {
-
- final RoutingService routingService;
-
- RoutingServiceProvider(RoutingService routingService) {
- this.routingService = routingService;
- }
-
- @Override
- public RoutingService.Builder statsReceiver(StatsReceiver statsReceiver) {
- return this;
- }
-
- @Override
- public RoutingService build() {
- return routingService;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-distributedlog/blob/b44820b5/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
----------------------------------------------------------------------
diff --git a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java b/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
deleted file mode 100644
index 2302e18..0000000
--- a/distributedlog-client/src/main/java/com/twitter/distributedlog/client/routing/RoutingUtils.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.twitter.distributedlog.client.routing;
-
-import com.twitter.common.zookeeper.ServerSet;
-import java.net.SocketAddress;
-
-/**
- * Utils for routing services.
- */
-public class RoutingUtils {
-
- private static final int NUM_CONSISTENT_HASH_REPLICAS = 997;
-
- /**
- * Building routing service from <code>finagleNameStr</code>.
- *
- * @param finagleNameStr
- * finagle name str of a service
- * @return routing service builder
- */
- public static RoutingService.Builder buildRoutingService(String finagleNameStr) {
- if (!finagleNameStr.startsWith("serverset!")
- && !finagleNameStr.startsWith("inet!")
- && !finagleNameStr.startsWith("zk!")) {
- // We only support serverset based names at the moment
- throw new UnsupportedOperationException("Finagle Name format not supported for name: " + finagleNameStr);
- }
- return buildRoutingService(new NameServerSet(finagleNameStr), true);
- }
-
- /**
- * Building routing service from <code>serverSet</code>.
- *
- * @param serverSet
- * server set of a service
- * @return routing service builder
- */
- public static RoutingService.Builder buildRoutingService(ServerSet serverSet) {
- return buildRoutingService(serverSet, false);
- }
-
- /**
- * Building routing service from <code>address</code>.
- *
- * @param address
- * host to route the requests
- * @return routing service builder
- */
- public static RoutingService.Builder buildRoutingService(SocketAddress address) {
- return SingleHostRoutingService.newBuilder().address(address);
- }
-
- /**
- * Build routing service builder of a routing service <code>routingService</code>.
- *
- * @param routingService
- * routing service to provide
- * @return routing service builder
- */
- public static RoutingService.Builder buildRoutingService(RoutingService routingService) {
- return new RoutingServiceProvider(routingService);
- }
-
- private static RoutingService.Builder buildRoutingService(ServerSet serverSet,
- boolean resolveFromName) {
- return ConsistentHashRoutingService.newBuilder()
- .serverSet(serverSet)
- .resolveFromName(resolveFromName)
- .numReplicas(NUM_CONSISTENT_HASH_REPLICAS);
- }
-
-}