You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2017/07/05 20:59:34 UTC
[2/2] kudu git commit: [java] separating Connection
[java] separating Connection
This patch separates lower-level, connection-related functionality
from the TabletClient class into the new Connection class.
The updated TabletClient has been renamed into RpcProxy.
Also, this patch contains other micro-updates on the related code.
In addition, this patch addresses KUDU-1878.
This work is done in the context of KUDU-2013.
Change-Id: Id4ac81d9454631e7501c31576c24f85e968bb871
Reviewed-on: http://gerrit.cloudera.org:8080/7146
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Alexey Serbin <as...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/58248841
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/58248841
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/58248841
Branch: refs/heads/master
Commit: 58248841f213a64683ee217f025f0a38a8450f74
Parents: 7e74527
Author: Alexey Serbin <as...@cloudera.com>
Authored: Thu Jun 1 18:39:13 2017 -0700
Committer: Alexey Serbin <as...@cloudera.com>
Committed: Wed Jul 5 20:58:46 2017 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 277 +++----
.../apache/kudu/client/ConnectToCluster.java | 112 ++-
.../java/org/apache/kudu/client/Connection.java | 634 +++++++++++++++
.../org/apache/kudu/client/ConnectionCache.java | 269 ++-----
.../java/org/apache/kudu/client/KuduRpc.java | 6 +-
.../java/org/apache/kudu/client/Negotiator.java | 5 +-
.../org/apache/kudu/client/RemoteTablet.java | 53 +-
.../java/org/apache/kudu/client/RpcProxy.java | 406 ++++++++++
.../java/org/apache/kudu/client/ServerInfo.java | 4 +-
.../org/apache/kudu/client/TabletClient.java | 779 -------------------
.../org/apache/kudu/client/BaseKuduTest.java | 6 +-
.../java/org/apache/kudu/client/ITClient.java | 7 +-
.../kudu/client/ITFaultTolerantScanner.java | 11 +-
.../kudu/client/ITNonFaultTolerantScanner.java | 4 +-
.../kudu/client/ITScannerMultiTablet.java | 21 +-
.../org/apache/kudu/client/MiniKuduCluster.java | 73 +-
.../apache/kudu/client/TestAsyncKuduClient.java | 16 +-
.../kudu/client/TestAsyncKuduSession.java | 13 +-
.../apache/kudu/client/TestConnectionCache.java | 109 ++-
.../apache/kudu/client/TestRemoteTablet.java | 31 +-
.../org/apache/kudu/client/TestTimeouts.java | 1 +
21 files changed, 1504 insertions(+), 1333 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 019a3f5..a304d05 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -27,8 +27,10 @@
package org.apache.kudu.client;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
import static org.apache.kudu.client.ExternalConsistencyMode.CLIENT_PROPAGATED;
+import java.net.InetAddress;
import java.net.UnknownHostException;
import java.security.cert.CertificateException;
import java.util.ArrayList;
@@ -43,6 +45,8 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.security.auth.Subject;
@@ -67,6 +71,7 @@ import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.kudu.Common;
import org.apache.kudu.Schema;
import org.apache.kudu.master.Master;
import org.apache.kudu.master.Master.GetTableLocationsResponsePB;
@@ -126,7 +131,8 @@ public class AsyncKuduClient implements AutoCloseable {
* a lookup of a single partition (e.g. for a write), or re-looking-up a tablet with
* stale information.
*/
- static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+ private static final int FETCH_TABLETS_PER_POINT_LOOKUP = 10;
+
/**
* The number of tablets to fetch from the master when looking up a range of
* tablets.
@@ -142,16 +148,18 @@ public class AsyncKuduClient implements AutoCloseable {
private final ConcurrentHashMap<String, TableLocationsCache> tableLocations =
new ConcurrentHashMap<>();
+ /** A cache to keep track of already opened connections to Kudu servers. */
private final ConnectionCache connectionCache;
@GuardedBy("sessions")
private final Set<AsyncKuduSession> sessions = new HashSet<>();
- // Since the masters also go through TabletClient, we need to treat them as if they were a normal
- // table. We'll use the following fake table name to identify places where we need special
+ // Since RPCs to the masters also go through RpcProxy, we need to treat them as if they were a
+ // normal table. We'll use the following fake table name to identify places where we need special
// handling.
+ // TODO(aserbin) clean this up
static final String MASTER_TABLE_NAME_PLACEHOLDER = "Kudu Master";
- final KuduTable masterTable;
+ private final KuduTable masterTable;
private final List<HostAndPort> masterAddresses;
private final HashedWheelTimer timer;
@@ -212,7 +220,40 @@ public class AsyncKuduClient implements AutoCloseable {
this.timer = b.timer;
String clientId = UUID.randomUUID().toString().replace("-", "");
this.requestTracker = new RequestTracker(clientId);
- this.connectionCache = new ConnectionCache(this);
+ this.connectionCache = new ConnectionCache(
+ securityContext, defaultSocketReadTimeoutMs, timer, channelFactory);
+ }
+
+ /**
+ * Get a proxy to send RPC calls to the specified server.
+ *
+ * @param serverInfo server's information
+ * @return the proxy object bound to the target server
+ */
+ @Nonnull
+ RpcProxy newRpcProxy(final ServerInfo serverInfo) {
+ Preconditions.checkNotNull(serverInfo);
+ return new RpcProxy(this, connectionCache.getConnection(serverInfo));
+ }
+
+ /**
+ * Get a proxy to send RPC calls to Kudu master at the specified end-point.
+ *
+ * @param hostPort master end-point
+ * @return the proxy object bound to the target master
+ */
+ @Nullable
+ RpcProxy newMasterRpcProxy(HostAndPort hostPort) {
+ // We should have a UUID to construct ServerInfo for the master, but we have a chicken
+ // and egg problem, we first need to communicate with the masters to find out about them,
+ // and that's what we're trying to do. The UUID is just used for logging and cache key,
+ // so instead we just use concatenation of master host and port, prefixed with "master-".
+ final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+ if (inetAddress == null) {
+ // TODO(todd): should we log the resolution failure? throw an exception?
+ return null;
+ }
+ return newRpcProxy(new ServerInfo("master-" + hostPort.toString(), hostPort, inetAddress));
}
/**
@@ -374,7 +415,7 @@ public class AsyncKuduClient implements AutoCloseable {
return sendRpcToTablet(rpc);
}
- Deferred<GetTableSchemaResponse> getTableSchema(String name) {
+ private Deferred<GetTableSchemaResponse> getTableSchema(String name) {
GetTableSchemaRequest rpc = new GetTableSchemaRequest(this.masterTable, name);
rpc.setTimeoutMillis(defaultAdminOperationTimeoutMs);
return sendRpcToTablet(rpc);
@@ -501,9 +542,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * This callback will be repeatadly used when opening a table until it is done being created.
+ * This callback will be repeatedly used when opening a table until it is done being created.
*/
- Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
+ private Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB> getOpenTableCB(
final KuduRpc<KuduTable> rpc, final KuduTable table) {
return new Callback<Deferred<KuduTable>, Master.IsCreateTableDoneResponsePB>() {
@Override
@@ -635,18 +676,6 @@ public class AsyncKuduClient implements AutoCloseable {
return requestTracker;
}
- HashedWheelTimer getTimer() {
- return timer;
- }
-
- ClientSocketChannelFactory getChannelFactory() {
- return channelFactory;
- }
-
- SecurityContext getSecurityContext() {
- return securityContext;
- }
-
/**
* Creates a new {@link AsyncKuduScanner.AsyncKuduScannerBuilder} for a particular table.
* @param table the name of the table you intend to scan.
@@ -691,23 +720,20 @@ public class AsyncKuduClient implements AutoCloseable {
*/
Deferred<AsyncKuduScanner.Response> scanNextRows(final AsyncKuduScanner scanner) {
RemoteTablet tablet = scanner.currentTablet();
- assert (tablet != null);
+ Preconditions.checkNotNull(tablet);
KuduRpc<AsyncKuduScanner.Response> nextRequest = scanner.getNextRowsRequest();
- String uuid = tablet.getReplicaSelectedUUID(nextRequest.getReplicaSelection());
- TabletClient client = connectionCache.getClient(uuid);
// Important to increment the attempts before the next if statement since
// getSleepTimeForRpc() relies on it if the client is null or dead.
nextRequest.attempt++;
- if (client == null || !client.isAlive()) {
- // A null client means we either don't know about this tablet anymore (unlikely) or we
- // couldn't find a leader (which could be triggered by a read timeout).
- // We'll first delay the RPC in case things take some time to settle down, then retry.
- Status statusRemoteError = Status.RemoteError("Not connected to server " + uuid +
- " will retry after a delay");
- return delayedSendRpcToTablet(nextRequest, new RecoverableException(statusRemoteError));
+ final ServerInfo info = tablet.getReplicaSelectedServerInfo(nextRequest.getReplicaSelection());
+ if (info == null) {
+ return delayedSendRpcToTablet(nextRequest, new RecoverableException(Status.RemoteError(
+ String.format("No information on servers hosting tablet %s, will retry later",
+ tablet.getTabletId()))));
}
+
Deferred<AsyncKuduScanner.Response> d = nextRequest.getDeferred();
- client.sendRpc(nextRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), nextRequest);
return d;
}
@@ -723,55 +749,19 @@ public class AsyncKuduClient implements AutoCloseable {
if (tablet == null) {
return Deferred.fromResult(null);
}
-
- final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
- final TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(closeRequest.getReplicaSelection()));
- if (client == null || !client.isAlive()) {
- // Oops, we couldn't find a tablet server that hosts this tablet. Our
- // cache was probably invalidated while the client was scanning. So
- // we can't close this scanner properly.
- LOG.warn("Cannot close {} properly, no connection open for {}", scanner, tablet);
+ final KuduRpc<AsyncKuduScanner.Response> closeRequest = scanner.getCloseRequest();
+ final ServerInfo info = tablet.getReplicaSelectedServerInfo(closeRequest.getReplicaSelection());
+ if (info == null) {
return Deferred.fromResult(null);
}
final Deferred<AsyncKuduScanner.Response> d = closeRequest.getDeferred();
closeRequest.attempt++;
- client.sendRpc(closeRequest);
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), closeRequest);
return d;
}
/**
- * Forcefully shuts down the RemoteTablet connection and
- * fails all outstanding RPCs.
- *
- * @param tablet the given tablet
- * @param replicaSelection replica selection mechanism to use
- */
- @VisibleForTesting
- void shutdownConnection(RemoteTablet tablet,
- ReplicaSelection replicaSelection) {
- TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(replicaSelection));
- client.shutdown();
- }
-
- /**
- * Forcefully disconnects the RemoteTablet connection and
- * fails all outstanding RPCs.
- *
- * @param tablet the given tablet
- * @param replicaSelection replica selection mechanism to use
- */
- @VisibleForTesting
- void disconnect(RemoteTablet tablet,
- ReplicaSelection replicaSelection) {
- TabletClient client = connectionCache.getClient(
- tablet.getReplicaSelectedUUID(replicaSelection));
- client.disconnect();
- }
-
- /**
* Sends the provided {@link KuduRpc} to the tablet server hosting the leader
* of the tablet identified by the RPC's table and partition key.
*
@@ -812,15 +802,12 @@ public class AsyncKuduClient implements AutoCloseable {
// If we found a tablet, we'll try to find the TS to talk to.
if (entry != null) {
RemoteTablet tablet = entry.getTablet();
- String uuid = tablet.getReplicaSelectedUUID(request.getReplicaSelection());
- if (uuid != null) {
+ ServerInfo info = tablet.getReplicaSelectedServerInfo(request.getReplicaSelection());
+ if (info != null) {
Deferred<R> d = request.getDeferred();
request.setTablet(tablet);
- TabletClient client = connectionCache.getLiveClient(uuid);
- if (client != null) {
- client.sendRpc(request);
- return d;
- }
+ RpcProxy.sendRpc(this, connectionCache.getConnection(info), request);
+ return d;
}
}
@@ -923,7 +910,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @param <R> Request's return type.
* @return An errback.
*/
- <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(final KuduRpc<R> request) {
+ private <R> Callback<Exception, Exception> getDelayedIsCreateTableDoneErrback(
+ final KuduRpc<R> request) {
return new Callback<Exception, Exception>() {
@Override
public Exception call(Exception e) throws Exception {
@@ -944,26 +932,26 @@ public class AsyncKuduClient implements AutoCloseable {
* @param errback the errback to call if something goes wrong when calling IsCreateTableDone
* @return Deferred used to track the provided KuduRpc
*/
- <R> Deferred<R> delayedIsCreateTableDone(final KuduTable table, final KuduRpc<R> rpc,
- final Callback<Deferred<R>,
- Master.IsCreateTableDoneResponsePB> retryCB,
- final Callback<Exception, Exception> errback) {
+ private <R> Deferred<R> delayedIsCreateTableDone(
+ final KuduTable table,
+ final KuduRpc<R> rpc,
+ final Callback<Deferred<R>,
+ Master.IsCreateTableDoneResponsePB> retryCB,
+ final Callback<Exception, Exception> errback) {
final class RetryTimer implements TimerTask {
public void run(final Timeout timeout) {
String tableId = table.getTableId();
final boolean has_permit = acquireMasterLookupPermit();
- if (!has_permit) {
+ if (!has_permit && !tablesNotServed.contains(tableId)) {
// If we failed to acquire a permit, it's worth checking if someone
// looked up the tablet we're interested in. Every once in a while
// this will save us a Master lookup.
- if (!tablesNotServed.contains(tableId)) {
- try {
- retryCB.call(null);
- return;
- } catch (Exception e) {
- // we're calling RetryRpcCB which doesn't throw exceptions, ignore
- }
+ try {
+ retryCB.call(null);
+ return;
+ } catch (Exception e) {
+ // we're calling RetryRpcCB which doesn't throw exceptions, ignore
}
}
IsCreateTableDoneRequest isCreateTableDoneRequest =
@@ -1027,11 +1015,11 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
- long getSleepTimeForRpc(KuduRpc<?> rpc) {
- byte attemptCount = rpc.attempt;
+ private long getSleepTimeForRpc(KuduRpc<?> rpc) {
+ int attemptCount = rpc.attempt;
assert (attemptCount > 0);
if (attemptCount == 0) {
- LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: " + rpc,
+ LOG.warn("Possible bug: attempting to retry an RPC with no attempts. RPC: {}", rpc,
new Exception("Exception created to collect stack trace"));
attemptCount = 1;
}
@@ -1039,29 +1027,12 @@ public class AsyncKuduClient implements AutoCloseable {
long sleepTime = (long)(Math.pow(2.0, Math.min(attemptCount, 12)) *
sleepRandomizer.nextDouble());
if (LOG.isTraceEnabled()) {
- LOG.trace("Going to sleep for " + sleepTime + " at retry " + rpc.attempt);
+ LOG.trace("Going to sleep for {} at retry {}", sleepTime, rpc.attempt);
}
return sleepTime;
}
/**
- * Modifying the list returned by this method won't change how AsyncKuduClient behaves,
- * but calling certain methods on the returned TabletClients can. For example,
- * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
- * TabletClient#shutdown()}.
- * @return copy of the current TabletClients list
- */
- @VisibleForTesting
- List<TabletClient> getTabletClients() {
- return connectionCache.getImmutableTabletClientsList();
- }
-
- @VisibleForTesting
- TabletClient getTabletClient(String uuid) {
- return connectionCache.getClient(uuid);
- }
-
- /**
* Clears {@link #tableLocations} of the table's entries.
*
* This method makes the maps momentarily inconsistent, and should only be
@@ -1080,7 +1051,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @return {@code true} if this RPC already had too many attempts,
* {@code false} otherwise (in which case it's OK to retry once more)
*/
- static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
+ private static boolean cannotRetryRequest(final KuduRpc<?> rpc) {
return rpc.deadlineTracker.timedOut() || rpc.attempt > MAX_RPC_ATTEMPTS;
}
@@ -1091,8 +1062,8 @@ public class AsyncKuduClient implements AutoCloseable {
* @param cause What was cause of the last failed attempt, if known.
* You can pass {@code null} if the cause is unknown.
*/
- static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
- final KuduException cause) {
+ private static <R> Deferred<R> tooManyAttemptsOrTimeout(final KuduRpc<R> request,
+ final KuduException cause) {
String message;
if (request.attempt > MAX_RPC_ATTEMPTS) {
message = "Too many attempts: ";
@@ -1127,7 +1098,7 @@ public class AsyncKuduClient implements AutoCloseable {
// this will save us a Master lookup.
TableLocationsCache.Entry entry = getTableLocationEntry(tableId, partitionKey);
if (entry != null && !entry.isNonCoveredRange() &&
- entry.getTablet().getLeaderUUID() != null) {
+ entry.getTablet().getLeaderServerInfo() != null) {
return Deferred.fromResult(null); // Looks like no lookup needed.
}
}
@@ -1164,9 +1135,8 @@ public class AsyncKuduClient implements AutoCloseable {
*/
Deferred<Master.GetTableLocationsResponsePB> getMasterTableLocationsPB(KuduRpc<?> parentRpc) {
// TODO(todd): stop using this 'masterTable' hack.
- return ConnectToCluster.run(masterTable, masterAddresses, parentRpc, connectionCache,
- defaultAdminOperationTimeoutMs)
- .addCallback(
+ return ConnectToCluster.run(masterTable, masterAddresses, parentRpc,
+ defaultAdminOperationTimeoutMs).addCallback(
new Callback<Master.GetTableLocationsResponsePB, ConnectToClusterResponse>() {
@Override
public Master.GetTableLocationsResponsePB call(ConnectToClusterResponse resp) {
@@ -1306,8 +1276,8 @@ public class AsyncKuduClient implements AutoCloseable {
* We're handling a tablet server that's telling us it doesn't have the tablet we're asking for.
* We're in the context of decode() meaning we need to either callback or retry later.
*/
- <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
- invalidateTabletCache(rpc.getTablet(), server);
+ <R> void handleTabletNotFound(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+ invalidateTabletCache(rpc.getTablet(), info);
handleRetryableError(rpc, ex);
}
@@ -1315,8 +1285,8 @@ public class AsyncKuduClient implements AutoCloseable {
* A tablet server is letting us know that it isn't the specified tablet's leader in response
* a RPC, so we need to demote it and retry.
*/
- <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, TabletClient server) {
- rpc.getTablet().demoteLeader(server.getServerInfo().getUuid());
+ <R> void handleNotLeader(final KuduRpc<R> rpc, KuduException ex, ServerInfo info) {
+ rpc.getTablet().demoteLeader(info.getUuid());
handleRetryableError(rpc, ex);
}
@@ -1370,12 +1340,40 @@ public class AsyncKuduClient implements AutoCloseable {
* Remove the tablet server from the RemoteTablet's locations. Right now nothing is removing
* the tablet itself from the caches.
*/
- private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
- String uuid = server.getServerInfo().getUuid();
+ private void invalidateTabletCache(RemoteTablet tablet, ServerInfo info) {
+ final String uuid = info.getUuid();
LOG.info("Removing server {} from this tablet's cache {}", uuid, tablet.getTabletId());
tablet.removeTabletClient(uuid);
}
+ /**
+ * Translate master-provided information {@link Master.TSInfoPB} on a tablet server into internal
+ * {@link ServerInfo} representation.
+ *
+ * @param tsInfoPB master-provided information for the tablet server
+ * @return an object that contains all the server's information
+ * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+ */
+ private ServerInfo resolveTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
+ final List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
+ final String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
+ if (addresses.isEmpty()) {
+ LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
+ return null;
+ }
+
+ // from meta_cache.cc
+ // TODO: if the TS advertises multiple host/ports, pick the right one
+ // based on some kind of policy. For now just use the first always.
+ final HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
+ final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+ if (inetAddress == null) {
+ throw new UnknownHostException(
+ "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
+ }
+ return new ServerInfo(uuid, hostPort, inetAddress);
+ }
+
/** Callback executed when a master lookup completes. */
private final class MasterLookupCB implements Callback<Object,
Master.GetTableLocationsResponsePB> {
@@ -1418,7 +1416,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
}
- boolean acquireMasterLookupPermit() {
+ private boolean acquireMasterLookupPermit() {
try {
// With such a low timeout, the JVM may chose to spin-wait instead of
// de-scheduling the thread (and causing context switches and whatnot).
@@ -1433,7 +1431,7 @@ public class AsyncKuduClient implements AutoCloseable {
* Releases a master lookup permit that was acquired.
* @see #acquireMasterLookupPermit
*/
- void releaseMasterLookupPermit() {
+ private void releaseMasterLookupPermit() {
masterLookups.release();
}
@@ -1477,7 +1475,7 @@ public class AsyncKuduClient implements AutoCloseable {
List<ServerInfo> servers = new ArrayList<>(tabletPb.getReplicasCount());
for (Master.TabletLocationsPB.ReplicaPB replica : tabletPb.getReplicasList()) {
try {
- ServerInfo serverInfo = connectionCache.connectTS(replica.getTsInfo());
+ ServerInfo serverInfo = resolveTS(replica.getTsInfo());
if (serverInfo != null) {
servers.add(serverInfo);
}
@@ -1508,7 +1506,7 @@ public class AsyncKuduClient implements AutoCloseable {
// right away. If not, we throw an exception that RetryRpcErrback will understand as needing to
// sleep before retrying.
TableLocationsCache.Entry entry = locationsCache.get(requestPartitionKey);
- if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderUUID() == null) {
+ if (!entry.isNonCoveredRange() && entry.getTablet().getLeaderServerInfo() == null) {
throw new NoLeaderFoundException(
Status.NotFound("Tablet " + entry.toString() + " doesn't have a leader"));
}
@@ -1588,8 +1586,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Invokes {@link #shutdown()} and waits. This method returns
- * void, so consider invoking shutdown directly if there's a need to handle dangling RPCs.
+ * Invokes {@link #shutdown()} and waits. This method returns void, so consider invoking
+ * {@link #shutdown()} directly if there's a need to handle dangling RPCs.
+ *
* @throws Exception if an error happens while closing the connections
*/
@Override
@@ -1607,7 +1606,8 @@ public class AsyncKuduClient implements AutoCloseable {
* <li>Releases all other resources.</li>
* </ul>
* <strong>Not calling this method before losing the last reference to this
- * instance may result in data loss and other unwanted side effects</strong>
+ * instance may result in data loss and other unwanted side effects.</strong>
+ *
* @return A {@link Deferred}, whose callback chain will be invoked once all
* of the above have been done. If this callback chain doesn't fail, then
* the clean shutdown will be successful, and all the data will be safe on
@@ -1628,6 +1628,7 @@ public class AsyncKuduClient implements AutoCloseable {
super("AsyncKuduClient@" + AsyncKuduClient.super.hashCode() + " shutdown");
}
+ @Override
public void run() {
// This terminates the Executor.
channelFactory.releaseExternalResources();
@@ -1676,7 +1677,7 @@ public class AsyncKuduClient implements AutoCloseable {
// concurrent modification during the iteration.
Set<AsyncKuduSession> copyOfSessions;
synchronized (sessions) {
- copyOfSessions = new HashSet<AsyncKuduSession>(sessions);
+ copyOfSessions = new HashSet<>(sessions);
}
if (sessions.isEmpty()) {
return Deferred.fromResult(null);
@@ -1690,7 +1691,7 @@ public class AsyncKuduClient implements AutoCloseable {
return Deferred.group(deferreds);
}
- private boolean isMasterTable(String tableId) {
+ private static boolean isMasterTable(String tableId) {
// Checking that it's the same instance so there's absolutely no chance of confusing the master
// 'table' for a user one.
return MASTER_TABLE_NAME_PLACEHOLDER == tableId;
@@ -1709,6 +1710,14 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * @return copy of the current TabletClients list
+ */
+ @VisibleForTesting
+ List<Connection> getConnectionListCopy() {
+ return connectionCache.getConnectionListCopy();
+ }
+
+ /**
* Builder class to use in order to connect to Kudu.
* All the parameters beyond those in the constructors are optional.
*/
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
index 22b31f0..dfba55d 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectToCluster.java
@@ -26,6 +26,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.net.HostAndPort;
import com.stumbleupon.async.Callback;
@@ -80,7 +81,8 @@ final class ConnectToCluster {
private static Deferred<ConnectToMasterResponsePB> connectToMaster(
final KuduTable masterTable,
- final TabletClient masterClient, KuduRpc<?> parentRpc,
+ final RpcProxy masterProxy,
+ KuduRpc<?> parentRpc,
long defaultTimeoutMs) {
// TODO: Handle the situation when multiple in-flight RPCs all want to query the masters,
// basically reuse in some way the master permits.
@@ -93,7 +95,7 @@ final class ConnectToCluster {
}
Deferred<ConnectToMasterResponsePB> d = rpc.getDeferred();
rpc.attempt++;
- masterClient.sendRpc(rpc);
+ masterProxy.sendRpc(rpc);
// If we are connecting to an older version of Kudu, we'll get an invalid request
// error. In that case, we resend using the older version of the RPC.
@@ -107,10 +109,10 @@ final class ConnectToCluster {
rre.getErrPB().getUnsupportedFeatureFlagsCount() > 0) {
AsyncKuduClient.LOG.debug("Falling back to GetMasterRegistration() RPC to connect " +
"to server running Kudu < 1.3.");
- Deferred<ConnectToMasterResponsePB> newAttempt = rpc.getDeferred();
- assert newAttempt != null;
+ final Deferred<ConnectToMasterResponsePB> newAttempt =
+ Preconditions.checkNotNull(rpc.getDeferred());
rpc.setUseOldMethod();
- masterClient.sendRpc(rpc);
+ masterProxy.sendRpc(rpc);
return newAttempt;
}
}
@@ -128,8 +130,6 @@ final class ConnectToCluster {
* @param masterTable the "placeholder" table used by AsyncKuduClient
* @param masterAddresses the addresses of masters to fetch from
* @param parentRpc RPC that prompted a master lookup, can be null
- * @param connCache the client's connection cache, used for creating connections
- * to masters
* @param defaultTimeoutMs timeout to use for RPCs if the parentRpc has no timeout
* @return a Deferred object for the cluster connection status
*/
@@ -137,7 +137,6 @@ final class ConnectToCluster {
KuduTable masterTable,
List<HostAndPort> masterAddresses,
KuduRpc<?> parentRpc,
- ConnectionCache connCache,
long defaultTimeoutMs) {
ConnectToCluster connector = new ConnectToCluster(masterAddresses);
@@ -146,14 +145,14 @@ final class ConnectToCluster {
// deferred.
for (HostAndPort hostAndPort : masterAddresses) {
Deferred<ConnectToMasterResponsePB> d;
- TabletClient client = connCache.newMasterClient(hostAndPort);
- if (client == null) {
+ RpcProxy proxy = masterTable.getAsyncClient().newMasterRpcProxy(hostAndPort);
+ if (proxy != null) {
+ d = connectToMaster(masterTable, proxy, parentRpc, defaultTimeoutMs);
+ } else {
String message = "Couldn't resolve this master's address " + hostAndPort.toString();
LOG.warn(message);
Status statusIOE = Status.IOError(message);
d = Deferred.fromError(new NonRecoverableException(statusIOE));
- } else {
- d = connectToMaster(masterTable, client, parentRpc, defaultTimeoutMs);
}
d.addCallbacks(connector.callbackForNode(hostAndPort),
connector.errbackForNode(hostAndPort));
@@ -192,56 +191,50 @@ final class ConnectToCluster {
* to responseD.
*/
private void incrementCountAndCheckExhausted() {
- if (countResponsesReceived.incrementAndGet() == numMasters) {
- if (responseDCalled.compareAndSet(false, true)) {
-
- // We want `allUnrecoverable` to only be true if all the masters came back with
- // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
- // that replies with RecoverableException or with an ok response but is a FOLLOWER is
- // enough to keep us retrying.
- boolean allUnrecoverable = true;
- if (exceptionsReceived.size() == countResponsesReceived.get()) {
- for (Exception ex : exceptionsReceived) {
- if (!(ex instanceof NonRecoverableException)) {
- allUnrecoverable = false;
- break;
- }
+ if (countResponsesReceived.incrementAndGet() == numMasters &&
+ responseDCalled.compareAndSet(false, true)) {
+ // We want `allUnrecoverable` to only be true if all the masters came back with
+ // NonRecoverableException so that we know for sure we can't retry anymore. Just one master
+ // that replies with RecoverableException or with an ok response but is a FOLLOWER is
+ // enough to keep us retrying.
+ boolean allUnrecoverable = true;
+ if (exceptionsReceived.size() == countResponsesReceived.get()) {
+ for (Exception ex : exceptionsReceived) {
+ if (!(ex instanceof NonRecoverableException)) {
+ allUnrecoverable = false;
+ break;
}
- } else {
- allUnrecoverable = false;
}
+ } else {
+ allUnrecoverable = false;
+ }
- String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
- if (allUnrecoverable) {
- // This will stop retries.
- String msg = String.format("Couldn't find a valid master in (%s). " +
- "Exceptions received: %s", allHosts,
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction())));
- Status s = Status.ServiceUnavailable(msg);
- responseD.callback(new NonRecoverableException(s));
+ String allHosts = NetUtil.hostsAndPortsToString(masterAddrs);
+ if (allUnrecoverable) {
+ // This will stop retries.
+ String msg = String.format("Couldn't find a valid master in (%s). " +
+ "Exceptions received: %s", allHosts,
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction())));
+ Status s = Status.ServiceUnavailable(msg);
+ responseD.callback(new NonRecoverableException(s));
+ } else {
+ String message = String.format("Master config (%s) has no leader.",
+ allHosts);
+ Exception ex;
+ if (exceptionsReceived.isEmpty()) {
+ LOG.warn("None of the provided masters {} is a leader; will retry", allHosts);
+ ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
} else {
- String message = String.format("Master config (%s) has no leader.",
- allHosts);
- Exception ex;
- if (exceptionsReceived.isEmpty()) {
- LOG.warn(String.format(
- "None of the provided masters (%s) is a leader, will retry.",
- allHosts));
- ex = new NoLeaderFoundException(Status.ServiceUnavailable(message));
- } else {
- LOG.warn(String.format(
- "Unable to find the leader master (%s), will retry",
- allHosts));
- String joinedMsg = message + " Exceptions received: " +
- Joiner.on(",").join(Lists.transform(
- exceptionsReceived, Functions.toStringFunction()));
- Status s = Status.ServiceUnavailable(joinedMsg);
- ex = new NoLeaderFoundException(s,
- exceptionsReceived.get(exceptionsReceived.size() - 1));
- }
- responseD.callback(ex);
+ LOG.warn("Unable to find the leader master {}; will retry", allHosts);
+ String joinedMsg = message + " Exceptions received: " +
+ Joiner.on(",").join(Lists.transform(
+ exceptionsReceived, Functions.toStringFunction()));
+ Status s = Status.ServiceUnavailable(joinedMsg);
+ ex = new NoLeaderFoundException(s,
+ exceptionsReceived.get(exceptionsReceived.size() - 1));
}
+ responseD.callback(ex);
}
}
}
@@ -273,8 +266,7 @@ final class ConnectToCluster {
// Someone else already found a leader. This is somewhat unexpected
// because this means two nodes think they're the leader, but it's
// not impossible. We'll just ignore it.
- LOG.debug("Callback already invoked, discarding response(" + r.toString() + ") from " +
- hostAndPort.toString());
+ LOG.debug("Callback already invoked, discarding response({}) from {}", r, hostAndPort);
return null;
}
@@ -304,7 +296,7 @@ final class ConnectToCluster {
@Override
public Void call(Exception e) throws Exception {
- LOG.warn("Error receiving a response from: " + hostAndPort, e);
+ LOG.warn("Error receiving response from {}", hostAndPort, e);
exceptionsReceived.add(e);
incrementCountAndCheckExhausted();
return null;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
new file mode 100644
index 0000000..ea1e113
--- /dev/null
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Connection.java
@@ -0,0 +1,634 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.kudu.client;
+
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
+import javax.net.ssl.SSLException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.stumbleupon.async.Callback;
+import com.stumbleupon.async.Deferred;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelEvent;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.channel.DefaultChannelPipeline;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.SocketChannel;
+import org.jboss.netty.channel.socket.SocketChannelConfig;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.timeout.ReadTimeoutException;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kudu.client.Negotiator.Result;
+import org.apache.kudu.rpc.RpcHeader;
+import org.apache.kudu.rpc.RpcHeader.RpcFeatureFlag;
+
+/**
+ * Class representing a connection from the client to a Kudu server (master or tablet server):
+ * a high-level wrapper for the TCP connection between the client and the server.
+ * <p>
+ * It's a stateful handler that manages a connection to a Kudu server.
+ * <p>
+ * This handler manages the RPC IDs, and keeps track of the RPCs in flight for which
+ * a response is currently awaited, as well as temporarily buffered RPCs that are waiting
+ * to be sent to the server.
+ * <p>
+ * Acquiring the monitor on an object of this class will prevent it from
+ * accepting write requests as well as buffering requests if the underlying
+ * channel isn't connected.
+ * TODO(aserbin) clarify on the socketReadTimeoutMs and using per-RPC timeout settings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+class Connection extends SimpleChannelUpstreamHandler {
+ /** Information on the target server. */
+ private final ServerInfo serverInfo;
+
+ /** Security context to use for connection negotiation. */
+ private final SecurityContext securityContext;
+
+ /** Read timeout for the connection (used by Netty's ReadTimeoutHandler) */
+ private final long socketReadTimeoutMs;
+
+ /** Timer to monitor read timeouts for the connection (used by Netty's ReadTimeoutHandler) */
+ private final HashedWheelTimer timer;
+
+ /** The underlying Netty's socket channel. */
+ private final SocketChannel channel;
+
+ /**
+ * Set to true when disconnect initiated explicitly from the client side. The channelDisconnected
+ * event handler then knows not to log any warning about unexpected disconnection from the peer.
+ */
+ private volatile boolean explicitlyDisconnected = false;
+
+ /** Logger: a sink for the log messages originated from this class. */
+ private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
+
+ private static final byte RPC_CURRENT_VERSION = 9;
+
+ /** Initial header sent by the client upon connection establishment. */
+ private static final byte[] CONNECTION_HEADER = new byte[]{'h', 'r', 'p', 'c',
+ RPC_CURRENT_VERSION, // RPC version.
+ 0,
+ 0
+ };
+
+ /** Lock to guard access to some of the fields below. */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /** A state of this object. */
+ @GuardedBy("lock")
+ private State state;
+
+ /**
+ * A hash table to store { callId, statusReportCallback } pairs, representing messages which have
+ * already been sent and pending responses from the server side. Once the server responds to a
+ * message, the corresponding entry is removed from the container and the response callback
+ * is invoked with the results represented by {@link CallResponseInfo}.
+ */
+ @GuardedBy("lock")
+ private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap<>();
+
+ /** Messages enqueued while the connection was not ready to start sending them over the wire. */
+ @GuardedBy("lock")
+ private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
+
+ /** The result of the connection negotiation. */
+ @GuardedBy("lock")
+ private Result negotiationResult = null;
+
+ /** A monotonically increasing counter for RPC IDs. */
+ @GuardedBy("lock")
+ private int nextCallId = 0;
+
+ Connection(ServerInfo serverInfo,
+ SecurityContext securityContext,
+ long socketReadTimeoutMs,
+ HashedWheelTimer timer,
+ ClientSocketChannelFactory channelFactory) {
+ this.serverInfo = serverInfo;
+ this.securityContext = securityContext;
+ this.state = State.NEW;
+ this.socketReadTimeoutMs = socketReadTimeoutMs;
+ this.timer = timer;
+
+ final ConnectionPipeline pipeline = new ConnectionPipeline();
+ pipeline.init();
+
+ channel = channelFactory.newChannel(pipeline);
+ SocketChannelConfig config = channel.getConfig();
+ config.setConnectTimeoutMillis(60000);
+ config.setTcpNoDelay(true);
+ // Unfortunately there is no way to override the keep-alive timeout in
+ // Java since the JRE doesn't expose any way to call setsockopt() with
+ // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
+ config.setKeepAlive(true);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelConnected(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) {
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.CONNECTING);
+ state = State.NEGOTIATING;
+ } finally {
+ lock.unlock();
+ }
+ Channels.write(channel, ChannelBuffers.wrappedBuffer(CONNECTION_HEADER));
+ Negotiator negotiator = new Negotiator(serverInfo.getHostname(), securityContext);
+ ctx.getPipeline().addBefore(ctx.getName(), "negotiation", negotiator);
+ negotiator.sendHello(channel);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void handleUpstream(final ChannelHandlerContext ctx,
+ final ChannelEvent e) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} upstream event {}", getLogPrefix(), e);
+ }
+ super.handleUpstream(ctx, e);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelDisconnected(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) throws Exception {
+ // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+ // pipeline after Connection itself. So, just handle the disconnection event ourselves.
+ cleanup("connection disconnected");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void channelClosed(final ChannelHandlerContext ctx,
+ final ChannelStateEvent e) throws Exception {
+ // No need to call super.channelClosed(ctx, e) -- there should be nobody in the upstream
+ // pipeline after Connection itself. So, just handle the close event ourselves.
+ cleanup("connection closed");
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt) throws Exception {
+ Object m = evt.getMessage();
+ if (m instanceof Negotiator.Result) {
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.NEGOTIATING);
+ Preconditions.checkNotNull(queuedMessages);
+
+ state = State.READY;
+ negotiationResult = (Negotiator.Result) m;
+ List<QueuedMessage> queued = queuedMessages;
+ // The queuedMessages should not be used anymore once the connection is negotiated.
+ queuedMessages = null;
+ // Send out all the enqueued messages. This is done while holding the lock to preserve
+ // the sequence of the already enqueued and being-enqueued-right-now messages and simplify
+ // the logic which checks for the consistency using Preconditions.checkXxx() methods.
+ for (final QueuedMessage qm : queued) {
+ sendCallToWire(qm.message, qm.cb);
+ }
+ } finally {
+ lock.unlock();
+ }
+ return;
+ }
+
+ // Some other event which the connection does not handle.
+ if (!(m instanceof CallResponse)) {
+ ctx.sendUpstream(evt);
+ return;
+ }
+
+ final CallResponse response = (CallResponse) m;
+ final RpcHeader.ResponseHeader header = response.getHeader();
+ if (!header.hasCallId()) {
+ final int size = response.getTotalResponseSize();
+ final String msg = getLogPrefix() +
+ " RPC response (size: " + size + ") doesn't" + " have callID: " + header;
+ LOG.error(msg);
+ throw new NonRecoverableException(Status.Incomplete(msg));
+ }
+
+ final int callId = header.getCallId();
+ Callback<Void, CallResponseInfo> responseCbk;
+ lock.lock();
+ try {
+ Preconditions.checkState(state == State.READY);
+ responseCbk = inflightMessages.remove(callId);
+ } finally {
+ lock.unlock();
+ }
+
+ if (responseCbk == null) {
+ final String msg = getLogPrefix() + " invalid callID: " + callId;
+ LOG.error(msg);
+ // If we get a bad RPC ID back, we are probably somehow misaligned from
+ // the server. So, we disconnect the connection.
+ throw new NonRecoverableException(Status.IllegalState(msg));
+ }
+
+ if (!header.hasIsError() || !header.getIsError()) {
+ // The success case.
+ responseCbk.call(new CallResponseInfo(response, null));
+ return;
+ }
+
+ final RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
+ KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
+ final RpcHeader.ErrorStatusPB error = errorBuilder.build();
+ if (error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) ||
+ error.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
+ responseCbk.call(new CallResponseInfo(
+ response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
+ return;
+ }
+
+ final String message = getLogPrefix() + " server sent error " + error.getMessage();
+ LOG.error(message); // can be useful
+ responseCbk.call(new CallResponseInfo(
+ response, new RpcRemoteException(Status.RemoteError(message), error)));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void exceptionCaught(final ChannelHandlerContext ctx,
+ final ExceptionEvent event) {
+ final Throwable e = event.getCause();
+ final Channel c = event.getChannel();
+
+ if (e instanceof RejectedExecutionException) {
+ LOG.warn("{} RPC rejected by the executor: {} (ignore if shutting down)", getLogPrefix(), e);
+ } else if (e instanceof ReadTimeoutException) {
+ LOG.debug("{} encountered a read timeout; closing the channel", getLogPrefix());
+ } else if (e instanceof ClosedChannelException) {
+ if (!explicitlyDisconnected) {
+ LOG.info("{} lost connection to peer", getLogPrefix());
+ }
+ } else if (e instanceof SSLException && explicitlyDisconnected) {
+ // There's a race in Netty where, when we call Channel.close(), it tries
+ // to send a TLS 'shutdown' message and enters a shutdown state. If another
+ // thread races to send actual data on the channel, then Netty will get a
+ // bit confused that we are trying to send data and misinterpret it as a
+ // renegotiation attempt, and throw an SSLException. So, we just ignore any
+ // SSLException if we've already attempted to close.
+ LOG.debug("{} ignoring SSLException: already disconnected", getLogPrefix());
+ } else {
+ LOG.error("{} unexpected exception from downstream on {}: {}", getLogPrefix(), c, e);
+ }
+ if (c.isOpen()) {
+ Channels.close(c); // Will trigger channelClosed(), which will cleanup()
+ } else { // else: presumably a connection timeout.
+ cleanup(e.getMessage()); // => need to cleanup() from here directly.
+ }
+ }
+
+ /** Getter for the peer's end-point information */
+ public ServerInfo getServerInfo() {
+ return serverInfo;
+ }
+
+ /**
+ * @return true iff the connection is in the DISCONNECTED state
+ */
+ boolean isDisconnected() {
+ lock.lock();
+ try {
+ return state == State.DISCONNECTED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * TODO(aserbin) make it possible to avoid calling this when the server features are not known yet
+ *
+ * @return the set of server's features, if known; null otherwise
+ */
+ @Nullable
+ Set<RpcFeatureFlag> getPeerFeatures() {
+ Set<RpcFeatureFlag> features = null;
+ lock.lock();
+ try {
+ if (negotiationResult != null) {
+ features = negotiationResult.serverFeatures;
+ }
+ } finally {
+ lock.unlock();
+ }
+ return features;
+ }
+
+ /**
+ * @return string representation of the peer (i.e. the server) information suitable for logging
+ */
+ String getLogPrefix() {
+ return "[peer " + serverInfo.getUuid() + "]";
+ }
+
+ /**
+ * Enqueue outbound message for sending to the remote server via Kudu RPC. The enqueueMessage()
+ * accepts messages even if the connection hasn't yet been established: the enqueued messages
+ * are sent out as soon as the connection to the server is ready. The connection is initiated upon
+ * enqueuing the very first outbound message.
+ */
+ void enqueueMessage(RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb)
+ throws RecoverableException {
+ lock.lock();
+ try {
+ if (state == State.DISCONNECTED) {
+ // The upper-level caller should handle the exception and retry using a new connection.
+ throw new RecoverableException(Status.IllegalState(
+ "connection in DISCONNECTED state; cannot enqueue a message"));
+ }
+
+ if (state == State.NEW) {
+ // Schedule connecting to the server.
+ connect();
+ }
+
+ // Set the call identifier for the outgoing RPC.
+ final int callId = nextCallId++;
+ RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder();
+ headerBuilder.setCallId(callId);
+
+ // Amend the timeout for the call, if necessary.
+ if (socketReadTimeoutMs > 0) {
+ final int timeoutMs = headerBuilder.getTimeoutMillis();
+ if (timeoutMs > 0) {
+ headerBuilder.setTimeoutMillis((int) Math.min(timeoutMs, socketReadTimeoutMs));
+ }
+ }
+
+ // If the connection hasn't been negotiated yet, add the message into the queuedMessages list.
+ // The elements of the queuedMessages list will be processed when the negotiation either
+ // succeeds or fails.
+ if (state != State.READY) {
+ queuedMessages.add(new QueuedMessage(msg, cb));
+ return;
+ }
+
+ // It's time to initiate sending the message over the wire.
+ sendCallToWire(msg, cb);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Triggers the channel to be disconnected, which will asynchronously cause all
+ * queued and in-flight RPCs to be failed. This method is idempotent.
+ *
+ * @return future object to wait on the disconnect completion, if necessary
+ */
+ ChannelFuture disconnect() {
+ explicitlyDisconnected = true;
+ return Channels.disconnect(channel);
+ }
+
+ /**
+ * If open, forcefully shut down the connection to the server. This is the same as
+ * {@link #disconnect}, but it returns Deferred instead of ChannelFuture.
+ *
+ * @return deferred object for tracking the shutting down of this connection
+ */
+ Deferred<Void> shutdown() {
+ final ChannelFuture disconnectFuture = disconnect();
+ final Deferred<Void> d = new Deferred<>();
+ disconnectFuture.addListener(new ChannelFutureListener() {
+ public void operationComplete(final ChannelFuture future) {
+ if (future.isSuccess()) {
+ d.callback(null);
+ return;
+ }
+ final Throwable t = future.getCause();
+ if (t instanceof Exception) {
+ d.callback(t);
+ } else {
+ d.callback(new NonRecoverableException(
+ Status.IllegalState("failed to shutdown: " + this), t));
+ }
+ }
+ });
+ return d;
+ }
+
+ /**
+ * @return string representation of this object (suitable for printing into the logs, etc.)
+ */
+ public String toString() {
+ final StringBuilder buf = new StringBuilder();
+ buf.append("Connection@")
+ .append(hashCode())
+ .append("(channel=")
+ .append(channel)
+ .append(", uuid=")
+ .append(serverInfo.getUuid());
+ int queuedMessagesNum = 0;
+ int inflightMessagesNum = 0;
+ lock.lock();
+ try {
+ queuedMessagesNum = queuedMessages == null ? 0 : queuedMessages.size();
+ inflightMessagesNum = inflightMessages == null ? 0 : inflightMessages.size();
+ } finally {
+ lock.unlock();
+ }
+ buf.append(", #queued=").append(queuedMessagesNum)
+ .append(", #inflight=").append(inflightMessagesNum)
+ .append(")");
+ return buf.toString();
+ }
+
+ /**
+ * This is test-only method.
+ *
+ * @return true iff the connection is in the READY state
+ */
+ @VisibleForTesting
+ boolean isReady() {
+ lock.lock();
+ try {
+ return state == State.READY;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /** Start sending the message to the server over the wire. */
+ @GuardedBy("lock")
+ private void sendCallToWire(final RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ Preconditions.checkState(state == State.READY);
+ Preconditions.checkNotNull(inflightMessages);
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} sending {}", getLogPrefix(), msg);
+ }
+ final int callId = msg.getHeaderBuilder().getCallId();
+ final Callback<Void, CallResponseInfo> empty = inflightMessages.put(callId, cb);
+ Preconditions.checkArgument(empty == null);
+ Channels.write(channel, msg);
+ }
+
+ /**
+ * Process the fact that the connection has been disconnected: update the state of this object and
+ * clean up any outstanding or lingering messages, notifying on the error via their status
+ * callbacks. The callee is supposed to handle the error and retry sending the messages,
+ * if needed.
+ *
+ * @param errorMessage a string to describe the cause of the cleanup
+ */
+ private void cleanup(final String errorMessage) {
+ List<QueuedMessage> queued;
+ Map<Integer, Callback<Void, CallResponseInfo>> inflight;
+
+ lock.lock();
+ try {
+ if (state == State.DISCONNECTED) {
+ Preconditions.checkState(queuedMessages == null);
+ Preconditions.checkState(inflightMessages == null);
+ return;
+ }
+
+ queued = queuedMessages;
+ queuedMessages = null;
+
+ inflight = inflightMessages;
+ inflightMessages = null;
+
+ state = State.DISCONNECTED;
+ } finally {
+ lock.unlock();
+ }
+ final Status error = Status.NetworkError(getLogPrefix() + " " +
+ (errorMessage == null ? "connection reset" : errorMessage));
+ final RecoverableException exception = new RecoverableException(error);
+
+ for (Callback<Void, CallResponseInfo> cb : inflight.values()) {
+ try {
+ cb.call(new CallResponseInfo(null, exception));
+ } catch (Exception e) {
+ LOG.warn("{} exception while aborting in-flight call: {}", getLogPrefix(), e);
+ }
+ }
+
+ if (queued != null) {
+ for (QueuedMessage qm : queued) {
+ try {
+ qm.cb.call(new CallResponseInfo(null, exception));
+ } catch (Exception e) {
+ LOG.warn("{} exception while aborting enqueued call: {}", getLogPrefix(), e);
+ }
+ }
+ }
+ }
+
+ /** Initiate opening TCP connection to the server. */
+ private void connect() {
+ Preconditions.checkState(lock.isHeldByCurrentThread());
+ Preconditions.checkState(state == State.NEW);
+ state = State.CONNECTING;
+ channel.connect(new InetSocketAddress(serverInfo.getResolvedAddress(), serverInfo.getPort()));
+ }
+
+ /** State of the Connection object. */
+ private enum State {
+ NEW, // The object has just been created.
+ CONNECTING, // The establishment of TCP connection to the server has started.
+ NEGOTIATING, // The connection negotiation has started.
+ READY, // The connection to the server has been opened, negotiated, and ready to use.
+ DISCONNECTED, // The TCP connection has been dropped off.
+ }
+
+ /**
+ * The class to represent RPC response received from the remote server.
+ * If the {@code exception} is null, then it's a success case and the {@code response} contains
+ * the information on the response. Otherwise it's an error and the {@code exception} provides
+ * information on the error. For the recoverable error case, the {@code exception} is of
+ * {@link RecoverableException} type, otherwise it's of {@link NonRecoverableException} type.
+ */
+ static final class CallResponseInfo {
+ public final CallResponse response;
+ public final KuduException exception;
+
+ CallResponseInfo(CallResponse response, KuduException exception) {
+ this.response = response;
+ this.exception = exception;
+ }
+ }
+
+ /** Internal class representing an enqueued outgoing message. */
+ private static final class QueuedMessage {
+ private final RpcOutboundMessage message;
+ private final Callback<Void, CallResponseInfo> cb;
+
+ QueuedMessage(RpcOutboundMessage message, Callback<Void, CallResponseInfo> cb) {
+ this.message = message;
+ this.cb = cb;
+ }
+ }
+
+ /** The helper class to build the Netty's connection pipeline. */
+ private final class ConnectionPipeline extends DefaultChannelPipeline {
+ void init() {
+ super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
+ KuduRpc.MAX_RPC_SIZE,
+ 0, // length comes at offset 0
+ 4, // length prefix is 4 bytes long
+ 0, // no "length adjustment"
+ 4 /* strip the length prefix */));
+ super.addLast("decode-inbound", new CallResponse.Decoder());
+ super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
+ if (Connection.this.socketReadTimeoutMs > 0) {
+ super.addLast("timeout-handler", new ReadTimeoutHandler(
+ Connection.this.timer, Connection.this.socketReadTimeoutMs, TimeUnit.MILLISECONDS));
+ }
+ super.addLast("kudu-handler", Connection.this);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
index 83ae8f0..844e79f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ConnectionCache.java
@@ -17,263 +17,126 @@
package org.apache.kudu.client;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
-import com.google.common.net.HostAndPort;
import com.stumbleupon.async.Deferred;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
-import org.jboss.netty.channel.DefaultChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.socket.SocketChannelConfig;
-import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
-import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.kudu.Common;
-import org.apache.kudu.master.Master;
-import org.apache.kudu.util.NetUtil;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.util.HashedWheelTimer;
/**
- * The ConnectionCache is responsible for managing connections to masters and tablet servers.
- * There should only be one instance per Kudu client, and can <strong>not</strong> be shared between
- * clients.
- * <p>
- * {@link TabletClient}s are currently never removed from this cache. Since the map is keyed by
- * UUID, it would require an ever-growing set of unique tablet servers to encounter memory issues.
- * The reason for keeping disconnected connections in the cache is two-fold: 1) it makes
- * reconnecting easier since only UUIDs are passed around so we can use the dead TabletClient's
- * host and port to reconnect (see {@link #getLiveClient(String)}) and 2) having the dead
- * connection prevents tight looping when hitting "Connection refused"-type of errors.
+ * The ConnectionCache is responsible for managing connections to Kudu masters and tablet servers.
+ * There should only be one instance of ConnectionCache per Kudu client, and it should not be
+ * shared between clients.
* <p>
+ * Disconnected instances of the {@link Connection} class are replaced in the cache with instances
+ * when {@link #getConnection(ServerInfo)) method is called with the same destination. Since the map
+ * is keyed by UUID of the server, it would require an ever-growing set of unique Kudu servers
+ * to encounter memory issues.
+ *
* This class is thread-safe.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
class ConnectionCache {
+ /** Security context to use for connection negotiation. */
+ private final SecurityContext securityContext;
- private static final Logger LOG = LoggerFactory.getLogger(ConnectionCache.class);
+ /** Read timeout for connections (used by Netty's ReadTimeoutHandler) */
+ private final long socketReadTimeoutMs;
- /**
- * Cache that maps UUIDs to the clients connected to them.
- * <p>
- * This isn't a {@link ConcurrentHashMap} because we want to do an atomic get-and-put,
- * {@code putIfAbsent} isn't a good fit for us since it requires creating
- * an object that may be "wasted" in case another thread wins the insertion
- * race, and we don't want to create unnecessary connections.
- */
- @GuardedBy("lock")
- private final HashMap<String, TabletClient> uuid2client = new HashMap<>();
+ /** Timer to monitor read timeouts for connections (used by Netty's ReadTimeoutHandler) */
+ private final HashedWheelTimer timer;
- private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ /** Netty's channel factory to use by connections. */
+ private final ClientSocketChannelFactory channelFactory;
- private final Lock readLock = lock.readLock();
- private final Lock writeLock = lock.readLock();
+ /** Synchronization primitive to guard access to the fields below. */
+ private final ReentrantLock lock = new ReentrantLock();
- private final AsyncKuduClient kuduClient;
+ @GuardedBy("lock")
+ private final HashMap<String, Connection> uuid2connection = new HashMap<>();
/**
- * Create a new empty ConnectionCache that will used the passed client to create connections.
- * @param client a client that contains the information we need to create connections
+ * Create a new empty ConnectionCache given the specified parameters.
*/
- ConnectionCache(AsyncKuduClient client) {
- this.kuduClient = client;
+ ConnectionCache(SecurityContext securityContext,
+ long socketReadTimeoutMs,
+ HashedWheelTimer timer,
+ ClientSocketChannelFactory channelFactory) {
+ this.securityContext = securityContext;
+ this.socketReadTimeoutMs = socketReadTimeoutMs;
+ this.timer = timer;
+ this.channelFactory = channelFactory;
}
/**
- * Create a connection to a tablet server based on information provided by the master.
- * @param tsInfoPB master-provided information for the tablet server
- * @return an object that contains all the server's information
- * @throws UnknownHostException if we cannot resolve the tablet server's IP address
+ * Get connection to the specified server. If no connection exists or the existing connection
+ * is already disconnected, then create a new connection to the specified server. The newly
+ * created connection is not negotiated until enqueuing the first RPC to the target server.
+ *
+ * @param serverInfo the server end-point to connect to
+ * @return instance of this object with the specified destination
*/
- ServerInfo connectTS(Master.TSInfoPB tsInfoPB) throws UnknownHostException {
- List<Common.HostPortPB> addresses = tsInfoPB.getRpcAddressesList();
- String uuid = tsInfoPB.getPermanentUuid().toStringUtf8();
- if (addresses.isEmpty()) {
- LOG.warn("Received a tablet server with no addresses, UUID: {}", uuid);
- return null;
- }
-
- // from meta_cache.cc
- // TODO: if the TS advertises multiple host/ports, pick the right one
- // based on some kind of policy. For now just use the first always.
- HostAndPort hostPort = ProtobufHelper.hostAndPortFromPB(addresses.get(0));
- InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
- if (inetAddress == null) {
- throw new UnknownHostException(
- "Failed to resolve the IP of `" + addresses.get(0).getHost() + "'");
- }
- return newClient(new ServerInfo(uuid, hostPort, inetAddress)).getServerInfo();
- }
-
- TabletClient newMasterClient(HostAndPort hostPort) {
- // We should pass a UUID here but we have a chicken and egg problem, we first need to
- // communicate with the masters to find out about them, and that's what we're trying to do.
- // The UUID is just used for logging and cache key, so instead we just use a constructed
- // string with the master host and port as.
- return newClient("master-" + hostPort.toString(), hostPort);
- }
+ public Connection getConnection(final ServerInfo serverInfo) {
+ Connection connection;
- TabletClient newClient(String uuid, HostAndPort hostPort) {
- InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
- if (inetAddress == null) {
- // TODO(todd): should we log the resolution failure? throw an exception?
- return null;
- }
-
- ServerInfo serverInfo = new ServerInfo(uuid, hostPort, inetAddress);
- return newClient(serverInfo);
- }
-
- TabletClient newClient(ServerInfo serverInfo) {
- TabletClient client;
- SocketChannel chan;
-
- writeLock.lock();
+ lock.lock();
try {
- client = uuid2client.get(serverInfo.getUuid());
- if (client != null && client.isAlive()) {
- return client;
+ // First try to find an existing connection.
+ connection = uuid2connection.get(serverInfo.getUuid());
+ if (connection == null || connection.isDisconnected()) {
+ // If no valid connection is found, create a new one.
+ connection = new Connection(serverInfo, securityContext,
+ socketReadTimeoutMs, timer, channelFactory);
+ uuid2connection.put(serverInfo.getUuid(), connection);
}
- final TabletClientPipeline pipeline = new TabletClientPipeline();
- client = pipeline.init(serverInfo);
- chan = this.kuduClient.getChannelFactory().newChannel(pipeline);
- uuid2client.put(serverInfo.getUuid(), client);
- } finally {
- writeLock.unlock();
- }
-
- final SocketChannelConfig config = chan.getConfig();
- config.setConnectTimeoutMillis(5000);
- config.setTcpNoDelay(true);
- // Unfortunately there is no way to override the keep-alive timeout in
- // Java since the JRE doesn't expose any way to call setsockopt() with
- // TCP_KEEPIDLE. And of course the default timeout is >2h. Sigh.
- config.setKeepAlive(true);
- chan.connect(new InetSocketAddress(serverInfo.getResolvedAddress(),
- serverInfo.getPort())); // Won't block.
- return client;
- }
-
- /**
- * Get a connection to a server for the given UUID. The returned connection can be down and its
- * state can be queried via {@link TabletClient#isAlive()}. To automatically get a client that's
- * gonna be re-connected automatically, use {@link #getLiveClient(String)}.
- * @param uuid server's identifier
- * @return a connection to a server, or null if the passed UUID isn't known
- */
- TabletClient getClient(String uuid) {
- readLock.lock();
- try {
- return uuid2client.get(uuid);
} finally {
- readLock.unlock();
+ lock.unlock();
}
- }
- /**
- * Get a connection to a server for the given UUID. This method will automatically call
- * {@link #newClient(String, InetAddress, int)} if the cached connection is down.
- * @param uuid server's identifier
- * @return a connection to a server, or null if the passed UUID isn't known
- */
- TabletClient getLiveClient(String uuid) {
- TabletClient client = getClient(uuid);
-
- if (client == null) {
- return null;
- } else if (client.isAlive()) {
- return client;
- } else {
- return newClient(client.getServerInfo());
- }
+ return connection;
}
/**
- * Asynchronously closes every socket, which will also cancel all the RPCs in flight.
+ * Asynchronously terminate every connection. This also cancels all the pending and in-flight
+ * RPCs.
*/
Deferred<ArrayList<Void>> disconnectEverything() {
- readLock.lock();
+ lock.lock();
try {
- ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2client.size());
- for (TabletClient ts : uuid2client.values()) {
- deferreds.add(ts.shutdown());
+ ArrayList<Deferred<Void>> deferreds = new ArrayList<>(uuid2connection.size());
+ for (Connection c : uuid2connection.values()) {
+ deferreds.add(c.shutdown());
}
return Deferred.group(deferreds);
} finally {
- readLock.unlock();
+ lock.unlock();
}
}
/**
- * The list returned by this method can't be modified,
- * but calling certain methods on the returned TabletClients can have an effect. For example,
- * it's possible to forcefully shutdown a connection to a tablet server by calling {@link
- * TabletClient#shutdown()}.
- * @return copy of the current TabletClients list
- */
- List<TabletClient> getImmutableTabletClientsList() {
- readLock.lock();
- try {
- return ImmutableList.copyOf(uuid2client.values());
- } finally {
- readLock.unlock();
- }
- }
-
- /**
- * Queries all the cached connections if they are alive.
- * @return true if all the connections are down, else false
+ * Return a copy of the all-connections-list. This method is exposed only to allow
+ * {@ref AsyncKuduClient} to forward it, so tests could get access to the underlying elements
+ * of the cache.
+ *
+ * @return a copy of the list of all connections in the connection cache
*/
@VisibleForTesting
- boolean allConnectionsAreDead() {
- readLock.lock();
+ List<Connection> getConnectionListCopy() {
+ lock.lock();
try {
- for (TabletClient tserver : uuid2client.values()) {
- if (tserver.isAlive()) {
- return false;
- }
- }
+ return ImmutableList.copyOf(uuid2connection.values());
} finally {
- readLock.unlock();
- }
- return true;
- }
-
- private final class TabletClientPipeline extends DefaultChannelPipeline {
- TabletClient init(ServerInfo serverInfo) {
- super.addFirst("decode-frames", new LengthFieldBasedFrameDecoder(
- KuduRpc.MAX_RPC_SIZE,
- 0, // length comes at offset 0
- 4, // length prefix is 4 bytes long
- 0, // no "length adjustment"
- 4 /* strip the length prefix */));
- super.addLast("decode-inbound", new CallResponse.Decoder());
- super.addLast("encode-outbound", new RpcOutboundMessage.Encoder());
- AsyncKuduClient kuduClient = ConnectionCache.this.kuduClient;
- final TabletClient client = new TabletClient(kuduClient, serverInfo);
- if (kuduClient.getDefaultSocketReadTimeoutMs() > 0) {
- super.addLast("timeout-handler",
- new ReadTimeoutHandler(kuduClient.getTimer(),
- kuduClient.getDefaultSocketReadTimeoutMs(),
- TimeUnit.MILLISECONDS));
- }
- super.addLast("kudu-handler", client);
-
- return client;
+ lock.unlock();
}
}
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
index 30f1a9c..0de894f 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduRpc.java
@@ -131,13 +131,13 @@ public abstract class KuduRpc<R> {
* that access this attribute will have a happens-before relationship with
* the rest of the code, due to other existing synchronization.
*/
- byte attempt; // package-private for TabletClient and AsyncKuduClient only.
+ int attempt; // package-private for RpcProxy and AsyncKuduClient only.
/**
- * Set by TabletClient when isRequestTracked returns true to identify this RPC in the sequence of
+ * Set by RpcProxy when isRequestTracked returns true to identify this RPC in the sequence of
* RPCs sent by this client. Once it is set it should never change unless the RPC is reused.
*/
- long sequenceId = RequestTracker.NO_SEQ_NO;
+ private long sequenceId = RequestTracker.NO_SEQ_NO;
KuduRpc(KuduTable table) {
this.table = table;
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
index 46d99db..a917af2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Negotiator.java
@@ -147,9 +147,6 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
*/
private DecoderEmbedder<ChannelBuffer> sslEmbedder;
- /** True if we have negotiated TLS with the server */
- private boolean negotiatedTls;
-
/**
* The nonce sent from the server to the client, or null if negotiation has
* not yet taken place, or the server does not send a nonce.
@@ -290,7 +287,7 @@ public class Negotiator extends SimpleChannelUpstreamHandler {
// Store the supported features advertised by the server.
serverFeatures = getFeatureFlags(response);
// If the server supports TLS, we will always speak TLS to it.
- negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
+ final boolean negotiatedTls = serverFeatures.contains(RpcFeatureFlag.TLS);
// Check the negotiated authentication type sent by the server.
chosenAuthnType = chooseAuthenticationType(response);
http://git-wip-us.apache.org/repos/asf/kudu/blob/58248841/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
index d4702f8..a6025f7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RemoteTablet.java
@@ -21,6 +21,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import com.google.common.base.Objects;
@@ -38,7 +39,7 @@ import org.apache.kudu.master.Master;
* This class encapsulates the information regarding a tablet and its locations.
* <p>
* RemoteTablet's main function is to keep track of where the leader for this
- * tablet is. For example, an RPC might call {@link #getLeaderUUID()}, contact that TS, find
+ * tablet is. For example, an RPC might call {@link #getLeaderServerInfo()}, contact that TS, find
* it's not the leader anymore, and then call {@link #demoteLeader(String)}.
* <p>
* A RemoteTablet's life is expected to be long in a cluster where roles aren't changing often,
@@ -138,53 +139,61 @@ class RemoteTablet implements Comparable<RemoteTablet> {
}
/**
- * Gets the UUID of the tablet server that we think holds the leader replica for this tablet.
- * @return a UUID of a tablet server that we think has the leader, else null
+ * Get the information on the tablet server that we think holds the leader replica for this
+ * tablet.
+ *
+ * @return information on a tablet server that we think has the leader, else null
*/
- String getLeaderUUID() {
+ @Nullable
+ ServerInfo getLeaderServerInfo() {
synchronized (tabletServers) {
- return leaderUuid;
+ return tabletServers.get(leaderUuid);
}
}
/**
- * Gets the UUID of the closest server. If none is closer than the others, returns a random
- * server UUID.
- * @return the UUID of the closest server, which might be any if none is closer, or null if this
- * cache doesn't know of any servers
+ * Get the information on the closest server. If none is closer than the others,
+ * return the information on a randomly picked server.
+ *
+ * @return the information on the closest server, which might be any if none is closer, or null
+ * if this cache doesn't know any servers.
*/
- String getClosestUUID() {
+ @Nullable
+ ServerInfo getClosestServerInfo() {
synchronized (tabletServers) {
- String lastUuid = null;
- for (ServerInfo serverInfo : tabletServers.values()) {
- lastUuid = serverInfo.getUuid();
- if (serverInfo.isLocal()) {
- return serverInfo.getUuid();
+ ServerInfo last = null;
+ for (ServerInfo e : tabletServers.values()) {
+ last = e;
+ if (e.isLocal()) {
+ return e;
}
}
- return lastUuid;
+ return last;
}
}
/**
* Helper function to centralize the calling of methods based on the passed replica selection
* mechanism.
+ *
* @param replicaSelection replica selection mechanism to use
- * @return a UUID for the server that matches the selection, can be null
+ * @return information on the server that matches the selection, can be null
*/
- String getReplicaSelectedUUID(ReplicaSelection replicaSelection) {
+ @Nullable
+ ServerInfo getReplicaSelectedServerInfo(ReplicaSelection replicaSelection) {
switch (replicaSelection) {
case LEADER_ONLY:
- return getLeaderUUID();
+ return getLeaderServerInfo();
case CLOSEST_REPLICA:
- return getClosestUUID();
+ return getClosestServerInfo();
default:
- throw new RuntimeException("Unknown replica selection mechanism " + replicaSelection);
+ throw new RuntimeException("unknown replica selection mechanism " + replicaSelection);
}
}
/**
- * Gets the replicas of this tablet. The returned list may not be mutated.
+ * Get replicas of this tablet. The returned list may not be mutated.
+ *
* @return the replicas of the tablet
*/
List<LocatedTablet.Replica> getReplicas() {