You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/04/28 04:32:38 UTC
[2/5] incubator-kudu git commit: KUDU-1364. Don't clear the cache
when a server disconnect
KUDU-1364. Don't clear the cache when a server disconnect
Folks seem to agree that, in the Java client, clearing the cache when a
server gets disconnected (for any reason) is not a good idea.
This patch does major surgery on how we handle cached TabletClients.
We now always keep them in RemoteTablet, and do our best to remember
if they were a leader. Only the ip2client is being removed from. We
now rely on the TabletClient's 'dead' attribute to know if a
connection is stale and needs to be re-established.
Not clear if client2tablets really needs to be removed from, and when.
We'll probably only remove from it in the future when we support
dropping ranges.
The patch also introduces a new integration test for the client. It's
still super basic but gives more confidence in the change.
It also fixes a dumb bug that was killing scanners too soon if there
was a disconnection.
Change-Id: I8606bfcedb09af57b66ba0f065067f0f3335a4a8
Reviewed-on: http://gerrit.cloudera.org:8080/2449
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/c4180bb0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/c4180bb0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/c4180bb0
Branch: refs/heads/master
Commit: c4180bb03bf3425756ea09966f004153a8a31f72
Parents: 3944122
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Thu Mar 3 20:58:38 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Thu Apr 28 01:48:53 2016 +0000
----------------------------------------------------------------------
.../java/org/kududb/client/AsyncKuduClient.java | 188 ++++++---
.../org/kududb/client/AsyncKuduScanner.java | 2 +
.../java/org/kududb/client/TabletClient.java | 51 ++-
.../test/java/org/kududb/client/ITClient.java | 396 +++++++++++++++++++
.../org/kududb/client/TestAsyncKuduClient.java | 20 +-
5 files changed, 591 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
index d7c5b4d..6482996 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduClient.java
@@ -582,12 +582,12 @@ public class AsyncKuduClient implements AutoCloseable {
final TabletClient client = clientFor(tablet);
final KuduRpc<AsyncKuduScanner.Response> next_request = scanner.getNextRowsRequest();
final Deferred<AsyncKuduScanner.Response> d = next_request.getDeferred();
- if (client == null) {
- // Oops, we no longer know anything about this client or tabletSlice. Our
- // cache was probably invalidated while the client was scanning. This
- // means that we lost the connection to that TabletServer, so we have to
- // try to re-connect and check if the scanner is still good.
- return sendRpcToTablet(next_request);
+ if (client == null || !client.isAlive()) {
+ // A null client means we either don't know about this tablet anymore (unlikely) or we
+ // couldn't find a leader (which could be triggered by a read timeout).
+ // We'll first delay the RPC in case things take some time to settle down, then retry.
+ delayedSendRpcToTablet(next_request, null);
+ return next_request.getDeferred();
}
next_request.attempt++;
client.sendRpc(next_request);
@@ -596,8 +596,8 @@ public class AsyncKuduClient implements AutoCloseable {
/**
* Package-private access point for {@link AsyncKuduScanner}s to close themselves.
- * @param scanner The scanner to close.
- * @return A deferred object that indicates the completion of the request.
+ * @param scanner the scanner to close
+ * @return a deferred object that indicates the completion of the request.
* The {@link AsyncKuduScanner.Response} can contain rows that were left to scan.
*/
Deferred<AsyncKuduScanner.Response> closeScanner(final AsyncKuduScanner scanner) {
@@ -608,9 +608,9 @@ public class AsyncKuduClient implements AutoCloseable {
}
final TabletClient client = clientFor(tablet);
- if (client == null) {
- // Oops, we no longer know anything about this client or tabletSlice. Our
- // cache was probably invalidated while the client was scanning. So
+ if (client == null || !client.isAlive()) {
+ // Oops, we couldn't find a tablet server that hosts this tablet. Our
+ // cache was probably invalidated while the client was scanning. So
// we can't close this scanner properly.
LOG.warn("Cannot close " + scanner + " properly, no connection open for "
+ (tablet == null ? null : tablet));
@@ -643,13 +643,45 @@ public class AsyncKuduClient implements AutoCloseable {
request.setPropagatedTimestamp(lastPropagatedTs);
}
+ // If we found a tablet, we'll try to find the TS to talk to. If that TS was previously
+ // disconnected, say because we didn't query that tablet for some seconds, then we'll try to
+ // reconnect based on the old information. If that fails, we'll instead continue with the next
+ // block that queries the master.
if (tablet != null) {
TabletClient tabletClient = clientFor(tablet);
if (tabletClient != null) {
- request.setTablet(tablet);
final Deferred<R> d = request.getDeferred();
- tabletClient.sendRpc(request);
- return d;
+ if (tabletClient.isAlive()) {
+ request.setTablet(tablet);
+ tabletClient.sendRpc(request);
+ return d;
+ }
+ try {
+ tablet.reconnectTabletClient(tabletClient);
+ } catch (UnknownHostException e) {
+ LOG.error("Cached tablet server {}'s host cannot be resolved, will query the master",
+ tabletClient.getUuid(), e);
+ // Because of this exception, clientFor() below won't be able to find a newTabletClient
+ // and we'll delay the RPC.
+ }
+ TabletClient newTabletClient = clientFor(tablet);
+ assert (tabletClient != newTabletClient);
+
+ if (newTabletClient == null) {
+ // Wait a little bit before hitting the master.
+ delayedSendRpcToTablet(request, null);
+ return request.getDeferred();
+ }
+
+ if (!newTabletClient.isAlive()) {
+ LOG.debug("Tried reconnecting to tablet server {} but failed, " +
+ "will query the master", tabletClient.getUuid());
+ // Let fall through.
+ } else {
+ request.setTablet(tablet);
+ newTabletClient.sendRpc(request);
+ return d;
+ }
}
}
@@ -687,6 +719,7 @@ public class AsyncKuduClient implements AutoCloseable {
this.request = request;
}
public Deferred<R> call(final D arg) {
+ LOG.debug("Retrying sending RPC {} after lookup", request);
return sendRpcToTablet(request); // Retry the RPC.
}
public String toString() {
@@ -937,6 +970,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
final Exception e = new NonRecoverableException(message + request, cause);
request.errback(e);
+ LOG.debug("Cannot continue with this RPC: {} because of: {}", request, message, e);
return Deferred.fromError(e);
}
@@ -1142,7 +1176,7 @@ public class AsyncKuduClient implements AutoCloseable {
private void invalidateTabletCache(RemoteTablet tablet, TabletClient server) {
LOG.info("Removing server " + server.getUuid() + " from this tablet's cache " +
tablet.getTabletIdAsString());
- tablet.removeTabletServer(server);
+ tablet.removeTabletClient(server);
}
/** Callback executed when a master lookup completes. */
@@ -1218,7 +1252,7 @@ public class AsyncKuduClient implements AutoCloseable {
// If we already know about this one, just refresh the locations
RemoteTablet currentTablet = tablet2client.get(tabletId);
if (currentTablet != null) {
- currentTablet.refreshServers(tabletPb);
+ currentTablet.refreshTabletClients(tabletPb);
continue;
}
@@ -1231,7 +1265,7 @@ public class AsyncKuduClient implements AutoCloseable {
}
LOG.info("Discovered tablet {} for table {} with partition {}",
tabletId.toString(Charset.defaultCharset()), tableName, rt.getPartition());
- rt.refreshServers(tabletPb);
+ rt.refreshTabletClients(tabletPb);
// This is making this tablet available
// Even if two clients were racing in this method they are putting the same RemoteTablet
// with the same start key in the CSLM in the end
@@ -1328,7 +1362,7 @@ public class AsyncKuduClient implements AutoCloseable {
return client;
}
final TabletClientPipeline pipeline = new TabletClientPipeline();
- client = pipeline.init(uuid);
+ client = pipeline.init(uuid, host, port);
chan = channelFactory.newChannel(pipeline);
ip2client.put(hostport, client); // This is guaranteed to return null.
}
@@ -1464,7 +1498,7 @@ public class AsyncKuduClient implements AutoCloseable {
public ArrayList<Void> call(final ArrayList<Void> arg) {
// Normally, now that we've shutdown() every client, all our caches should
// be empty since each shutdown() generates a DISCONNECTED event, which
- // causes TabletClientPipeline to call removeClientFromCache().
+ // causes TabletClientPipeline to call removeClientFromIpCache().
HashMap<String, TabletClient> logme = null;
synchronized (ip2client) {
if (!ip2client.isEmpty()) {
@@ -1542,12 +1576,12 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * Removes all the cache entries referred to the given client.
- * @param client The client for which we must invalidate everything.
- * @param remote The address of the remote peer, if known, or null.
+ * Removes the given client from the `ip2client` cache.
+ * @param client The client for which we must clear the ip cache
+ * @param remote The address of the remote peer, if known, or null
*/
- private void removeClientFromCache(final TabletClient client,
- final SocketAddress remote) {
+ private void removeClientFromIpCache(final TabletClient client,
+ final SocketAddress remote) {
if (remote == null) {
return; // Can't continue without knowing the remote address.
@@ -1584,19 +1618,24 @@ public class AsyncKuduClient implements AutoCloseable {
+ hostport + "), it was found that there was no entry"
+ " corresponding to " + remote + ". This shouldn't happen.");
}
+ }
- ArrayList<RemoteTablet> tablets = client2tablets.remove(client);
+ /**
+ * Call this method after encountering an error connecting to a tablet server so that we stop
+ * considering it a leader for the tablets it serves.
+ * @param client tablet server to use for demotion
+ */
+ void demoteAsLeaderForAllTablets(final TabletClient client) {
+ ArrayList<RemoteTablet> tablets = client2tablets.get(client);
if (tablets != null) {
// Make a copy so we don't need to synchronize on it while iterating.
RemoteTablet[] tablets_copy;
synchronized (tablets) {
tablets_copy = tablets.toArray(new RemoteTablet[tablets.size()]);
- tablets = null;
- // If any other thread still has a reference to `tablets', their
- // updates will be lost (and we don't care).
}
for (final RemoteTablet remoteTablet : tablets_copy) {
- remoteTablet.removeTabletServer(client);
+ // It will be a no-op if it's not already a leader.
+ remoteTablet.demoteLeader(client);
}
}
}
@@ -1620,8 +1659,8 @@ public class AsyncKuduClient implements AutoCloseable {
*/
private boolean disconnected = false;
- TabletClient init(String uuid) {
- final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid);
+ TabletClient init(String uuid, String host, int port) {
+ final TabletClient client = new TabletClient(AsyncKuduClient.this, uuid, host, port);
if (defaultSocketReadTimeoutMs > 0) {
super.addLast("timeout-handler",
new ReadTimeoutHandler(timer,
@@ -1679,10 +1718,8 @@ public class AsyncKuduClient implements AutoCloseable {
remote = slowSearchClientIP(client);
}
- // Prevent the client from buffering requests while we invalidate
- // everything we have about it.
synchronized (client) {
- removeClientFromCache(client, remote);
+ removeClientFromIpCache(client, remote);
}
} catch (Exception e) {
log.error("Uncaught exception when handling a disconnection of " + getChannel(), e);
@@ -1766,7 +1803,7 @@ public class AsyncKuduClient implements AutoCloseable {
* succeeds.
*
* Subtleties:
- * We don't keep track of a TS after it disconnects (via removeTabletServer), so if we
+ * We don't keep track of a TS after it disconnects (via removeTabletClient), so if we
* haven't contacted one for 10 seconds (socket timeout), it will be removed from the list of
* tabletServers. This means that if the leader fails, we only have one other TS to "promote"
* or maybe none at all. This is partly why we then set leaderIndex to NO_LEADER_INDEX.
@@ -1793,7 +1830,7 @@ public class AsyncKuduClient implements AutoCloseable {
this.partition = partition;
}
- void refreshServers(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
+ void refreshTabletClients(Master.TabletLocationsPB tabletLocations) throws NonRecoverableException {
synchronized (tabletServers) { // TODO not a fat lock with IP resolving in it
tabletServers.clear();
@@ -1820,9 +1857,9 @@ public class AsyncKuduClient implements AutoCloseable {
lookupExceptions.add(ex);
}
}
- leaderIndex = 0;
+
if (leaderIndex == NO_LEADER_INDEX) {
- LOG.warn("No leader provided for tablet " + getTabletIdAsString());
+ LOG.warn("No leader provided for tablet {}", getTabletIdAsString());
}
// If we found a tablet that doesn't contain a single location that we can resolve, there's
@@ -1846,19 +1883,48 @@ public class AsyncKuduClient implements AutoCloseable {
final ArrayList<RemoteTablet> tablets = client2tablets.get(client);
- if (tablets == null) {
- // We raced with removeClientFromCache and lost. The client we got was just disconnected.
- // Reconnect.
- addTabletClient(uuid, host, port, isLeader);
- } else {
- synchronized (tablets) {
- if (isLeader) {
- tabletServers.add(0, client);
- } else {
- tabletServers.add(client);
- }
- tablets.add(this);
+ synchronized (tablets) {
+ tabletServers.add(client);
+ if (isLeader) {
+ leaderIndex = tabletServers.size() - 1;
}
+ tablets.add(this);
+ }
+ }
+
+ /**
+ * Call this method when an existing TabletClient in this tablet's cache is found to be dead.
+ * It removes the passed TS from this tablet's cache and replaces it with a new instance of
+ * TabletClient. It will keep its leader status if it was already considered a leader.
+ * If the passed TabletClient was already removed, then this is a no-op.
+ * @param staleTs TS to reconnect to
+ * @throws UnknownHostException if we can't resolve server's hostname
+ */
+ void reconnectTabletClient(TabletClient staleTs) throws UnknownHostException {
+ assert (!staleTs.isAlive());
+
+ synchronized (tabletServers) {
+ int index = tabletServers.indexOf(staleTs);
+
+ if (index == -1) {
+ // Another thread already took care of it.
+ return;
+ }
+
+ boolean wasLeader = index == leaderIndex;
+
+ LOG.debug("Reconnecting to server {} for tablet {}. Was a leader? {}",
+ staleTs.getUuid(), getTabletIdAsString(), wasLeader);
+
+ boolean removed = removeTabletClient(staleTs);
+
+ if (!removed) {
+ LOG.debug("{} was already removed from tablet {}'s cache when reconnecting to it",
+ staleTs.getUuid(), getTabletIdAsString());
+ }
+
+ addTabletClient(staleTs.getUuid(), staleTs.getHost(),
+ staleTs.getPort(), wasLeader);
}
}
@@ -1873,7 +1939,7 @@ public class AsyncKuduClient implements AutoCloseable {
* @param ts A TabletClient that was disconnected.
* @return True if this method removed ts from the list, else false.
*/
- boolean removeTabletServer(TabletClient ts) {
+ boolean removeTabletClient(TabletClient ts) {
synchronized (tabletServers) {
// TODO unit test for this once we have the infra
int index = tabletServers.indexOf(ts);
@@ -1894,10 +1960,10 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
- * If the passed TabletClient is the current leader, then the next one in the list will be
- * "promoted" unless we're at the end of the list, in which case we set the leaderIndex to
- * NO_LEADER_INDEX which will force a call to the master.
- * @param ts A TabletClient that gave a sign that it isn't this tablet's leader.
+ * Clears the leader index if the passed tablet server is the current leader.
+ * If it is the current leader, then the next call to this tablet will have
+ * to query the master to find the new leader.
+ * @param ts a TabletClient that gave a sign that it isn't this tablet's leader
*/
void demoteLeader(TabletClient ts) {
synchronized (tabletServers) {
@@ -1905,15 +1971,17 @@ public class AsyncKuduClient implements AutoCloseable {
// If this TS was removed or we're already forcing a call to the master (meaning someone
// else beat us to it), then we just noop.
if (index == -1 || leaderIndex == NO_LEADER_INDEX) {
+ LOG.debug("{} couldn't be demoted as the leader for {}",
+ ts.getUuid(), getTabletIdAsString());
return;
}
if (leaderIndex == index) {
- if (leaderIndex + 1 == tabletServers.size()) {
- leaderIndex = NO_LEADER_INDEX;
- } else {
- leaderIndex++;
- }
+ leaderIndex = NO_LEADER_INDEX;
+ LOG.debug("{} was demoted as the leader for {}", ts.getUuid(), getTabletIdAsString());
+ } else {
+ LOG.debug("{} wasn't the leader for {}, current leader is at index {}", ts.getUuid(),
+ getTabletIdAsString(), leaderIndex);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
index cabca8d..80c4f67 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/AsyncKuduScanner.java
@@ -743,11 +743,13 @@ public final class AsyncKuduScanner {
.setBatchSizeBytes(batchSizeBytes);
break;
case NEXT:
+ setTablet(AsyncKuduScanner.this.tablet);
builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
.setCallSeqId(sequenceId)
.setBatchSizeBytes(batchSizeBytes);
break;
case CLOSING:
+ setTablet(AsyncKuduScanner.this.tablet);
builder.setScannerId(ZeroCopyLiteralByteString.wrap(scannerId))
.setBatchSizeBytes(0)
.setCloseScanner(true);
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
index e18eb55..3f91ece 100644
--- a/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
+++ b/java/kudu-client/src/main/java/org/kududb/client/TabletClient.java
@@ -26,8 +26,7 @@
package org.kududb.client;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Sets;
+import com.google.common.annotations.VisibleForTesting;
import com.stumbleupon.async.Deferred;
import org.jboss.netty.handler.timeout.ReadTimeoutException;
@@ -136,14 +135,20 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
private final String uuid;
+ private final String host;
+
+ private final int port;
+
private final long socketReadTimeoutMs;
private SecureRpcHelper secureRpcHelper;
- public TabletClient(AsyncKuduClient client, String uuid) {
+ public TabletClient(AsyncKuduClient client, String uuid, String host, int port) {
this.kuduClient = client;
this.uuid = uuid;
this.socketReadTimeoutMs = client.getDefaultSocketReadTimeoutMs();
+ this.host = host;
+ this.port = port;
}
<R> void sendRpc(KuduRpc<R> rpc) {
@@ -252,6 +257,22 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
return payload;
}
+ /**
+ * Quick and dirty way to close a connection to a tablet server, if it wasn't already closed.
+ */
+ @VisibleForTesting
+ void disconnect() {
+ Channel chancopy = chan;
+ if (chancopy != null && chancopy.isConnected()) {
+ Channels.disconnect(chancopy);
+ }
+ }
+
+ /**
+ * Forcefully shuts down the connection to this tablet server and fails all the outstanding RPCs.
+ * Only use when shutting down a client.
+ * @return deferred object to use to track the shutting down of this connection
+ */
public Deferred<Void> shutdown() {
// First, check whether we have RPCs in flight and cancel them.
for (Iterator<KuduRpc<?>> ite = rpcs_inflight.values().iterator(); ite
@@ -667,10 +688,12 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
private void failOrRetryRpc(final KuduRpc<?> rpc,
final ConnectionResetException exception) {
AsyncKuduClient.RemoteTablet tablet = rpc.getTablet();
+ // Note As of the time of writing (03/11/16), a null tablet doesn't make sense, if we see a null
+ // tablet it's because we didn't set it properly before calling sendRpc().
if (tablet == null) { // Can't retry, dunno where this RPC should go.
rpc.errback(exception);
} else {
- kuduClient.handleTabletNotFound(rpc, exception, this);
+ kuduClient.handleRetryableError(rpc, exception);
}
}
@@ -691,6 +714,9 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
cleanup(c);
} else {
LOG.error(getPeerUuidLoggingString() + "Unexpected exception from downstream on " + c, e);
+ // For any other exception, likely a connection error, we clear the leader state
+ // for those tablets that this TS is the cached leader of.
+ kuduClient.demoteAsLeaderForAllTablets(this);
}
if (c.isOpen()) {
Channels.close(c); // Will trigger channelClosed(), which will cleanup()
@@ -757,6 +783,23 @@ public class TabletClient extends ReplayingDecoder<VoidEnum> {
return uuid;
}
+ /**
+ * Returns this tablet server's port.
+ * @return a port number that this tablet server is bound to
+ */
+ int getPort() {
+ return port;
+ }
+
+ /**
+ * Returns this tablet server's hostname. We might get many hostnames from the master for a single
+ * TS, and this is the one we picked to connect to originally.
+ * @returna string that contains this tablet server's hostname
+ */
+ String getHost() {
+ return host;
+ }
+
public String toString() {
final StringBuilder buf = new StringBuilder(13 + 10 + 6 + 64 + 7 + 32 + 16 + 1 + 17 + 2 + 1);
buf.append("TabletClient@") // =13
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/ITClient.java b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
new file mode 100644
index 0000000..37d513e
--- /dev/null
+++ b/java/kudu-client/src/test/java/org/kududb/client/ITClient.java
@@ -0,0 +1,396 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+package org.kududb.client;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Integration test for the client. RPCs are sent to Kudu from multiple threads while processes
+ * are restarted and failures are injected.
+ *
+ * By default this test runs for 60 seconds, but this can be changed by passing a different value
+ * in "itclient.runtime.seconds". For example:
+ * "mvn test -Dtest=ITClient -Ditclient.runtime.seconds=120".
+ */
+public class ITClient extends BaseKuduTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ITClient.class);
+
+ private static final String RUNTIME_PROPERTY_NAME = "itclient.runtime.seconds";
+ private static final long DEFAULT_RUNTIME_SECONDS = 60;
+ // Time we'll spend waiting at the end of the test for things to settle. Also the minimum this
+ // test can run for.
+ private static final long TEST_MIN_RUNTIME_SECONDS = 2;
+ private static final long TEST_TIMEOUT_SECONDS = 600000;
+
+ private static final String TABLE_NAME =
+ ITClient.class.getName() + "-" + System.currentTimeMillis();
+ // One error and we stop the test.
+ private static final CountDownLatch KEEP_RUNNING_LATCH = new CountDownLatch(1);
+ // Latch used to track if an error occurred and we need to stop the test early.
+ private static final CountDownLatch ERROR_LATCH = new CountDownLatch(1);
+
+ private static KuduClient localClient;
+ private static AsyncKuduClient localAsyncClient;
+ private static KuduTable table;
+ private static long runtimeInSeconds;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+
+ String runtimeProp = System.getProperty(RUNTIME_PROPERTY_NAME);
+ runtimeInSeconds = runtimeProp == null ? DEFAULT_RUNTIME_SECONDS : Long.parseLong(runtimeProp);
+
+ if (runtimeInSeconds < TEST_MIN_RUNTIME_SECONDS || runtimeInSeconds > TEST_TIMEOUT_SECONDS) {
+ Assert.fail("This test needs to run more more than " + TEST_MIN_RUNTIME_SECONDS + " seconds" +
+ " and less than " + TEST_TIMEOUT_SECONDS + " seconds");
+ }
+
+ LOG.info ("Test running for {} seconds", runtimeInSeconds);
+
+ BaseKuduTest.setUpBeforeClass();
+
+ // Client we're using has low tolerance for read timeouts but a
+ // higher overall operation timeout.
+ localAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(masterAddresses)
+ .defaultSocketReadTimeoutMs(500)
+ .defaultOperationTimeoutMs(20000)
+ .build();
+ localClient = new KuduClient(localAsyncClient);
+
+ CreateTableOptions builder = new CreateTableOptions().setNumReplicas(3);
+ table = localClient.createTable(TABLE_NAME, basicSchema, builder);
+ }
+
+ @Test(timeout = TEST_TIMEOUT_SECONDS)
+ public void test() throws Exception {
+ ArrayList<Thread> threads = new ArrayList<>();
+ Thread chaosThread = new Thread(new ChaosThread());
+ Thread writerThread = new Thread(new WriterThread());
+ Thread scannerThread = new Thread(new ScannerThread());
+
+ threads.add(chaosThread);
+ threads.add(writerThread);
+ threads.add(scannerThread);
+
+ for (Thread thread : threads) {
+ thread.start();
+ }
+
+ // await() returns yes if the latch reaches 0, we don't want that.
+ Assert.assertFalse("Look for the last ERROR line in the log that comes from ITCLient",
+ ERROR_LATCH.await(runtimeInSeconds, TimeUnit.SECONDS));
+
+ // Indicate we want to stop, then wait a little bit for it to happen.
+ KEEP_RUNNING_LATCH.countDown();
+
+ for (Thread thread : threads) {
+ thread.interrupt();
+ thread.join();
+ }
+
+ AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+ int rowCount = countRowsInScan(scannerBuilder);
+ Assert.assertTrue(rowCount + " should be higher than 0", rowCount > 0);
+ }
+
+ /**
+ * Logs an error message and triggers the error count down latch, stopping this test.
+ * @param message error message to print
+ * @param exception optional exception to print
+ */
+ private void reportError(String message, Exception exception) {
+ LOG.error(message, exception);
+ ERROR_LATCH.countDown();
+ }
+
+ /**
+ * Thread that introduces chaos in the cluster, one at a time.
+ */
+ class ChaosThread implements Runnable {
+
+ private final Random random = new Random();
+
+ @Override
+ public void run() {
+ try {
+ KEEP_RUNNING_LATCH.await(2, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ return;
+ }
+ while (KEEP_RUNNING_LATCH.getCount() > 0) {
+ try {
+ boolean shouldContinue;
+ if (System.currentTimeMillis() % 2 == 0) {
+ shouldContinue = restartTS();
+ } else {
+
+ shouldContinue = disconnectNode();
+ }
+ // TODO restarting the master currently finds more bugs. Also, adding it to the list makes
+ // it necessary to find a new weighing mechanism betweent he different chaos options.
+ // shouldContinue = restartMaster();
+
+ if (!shouldContinue) {
+ return;
+ }
+ KEEP_RUNNING_LATCH.await(5, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ return;
+ }
+
+ }
+ }
+
+ /**
+ * Failure injection. Picks a random tablet server from the client's cache and force
+ * disconects it.
+ * @return true if successfully completed or didn't find a server to disconnect, false it it
+ * encountered a failure
+ */
+ private boolean disconnectNode() {
+ try {
+ if (localAsyncClient.getTableClients().size() == 0) {
+ return true;
+ }
+
+ int tsToDisconnect = random.nextInt(localAsyncClient.getTableClients().size());
+ localAsyncClient.getTableClients().get(tsToDisconnect).disconnect();
+
+ } catch (Exception e) {
+ if (KEEP_RUNNING_LATCH.getCount() == 0) {
+ // Likely shutdown() related.
+ return false;
+ }
+ reportError("Couldn't disconnect a TS", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Forces the restart of a random tablet server.
+ * @return true if it successfully completed, false if it failed
+ */
+ private boolean restartTS() {
+ try {
+ BaseKuduTest.restartTabletServer(table);
+ } catch (Exception e) {
+ reportError("Couldn't restart a TS", e);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Forces the restart of the master.
+ * @return true if it successfully completed, false if it failed
+ */
+ private boolean restartMaster() {
+ try {
+ BaseKuduTest.restartLeaderMaster();
+ } catch (Exception e) {
+ reportError("Couldn't restart a master", e);
+ return false;
+ }
+ return true;
+ }
+
+ }
+
+ /**
+ * Thread that writes sequentially to the table. Every 10 rows it considers setting the flush mode
+ * to MANUAL_FLUSH or AUTO_FLUSH_SYNC.
+ */
+ class WriterThread implements Runnable {
+
+ private final KuduSession session = localClient.newSession();
+ private final Random random = new Random();
+ private int currentRowKey = 0;
+
+ @Override
+ public void run() {
+ session.setIgnoreAllDuplicateRows(true);
+ while (KEEP_RUNNING_LATCH.getCount() > 0) {
+ try {
+ OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
+ if (hasRowErrorAndReport(resp)) {
+ return;
+ }
+ currentRowKey++;
+
+ // Every 10 rows we flush and change the flush mode randomly.
+ if (currentRowKey % 10 == 0) {
+
+ // First flush any accumulated rows before switching.
+ List<OperationResponse> responses = session.flush();
+ if (responses != null) {
+ for (OperationResponse batchedResp : responses) {
+ if (hasRowErrorAndReport(batchedResp)) {
+ return;
+ }
+ }
+ }
+
+ if (random.nextBoolean()) {
+ session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
+ } else {
+ session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
+ }
+ }
+ } catch (Exception e) {
+ if (KEEP_RUNNING_LATCH.getCount() == 0) {
+ // Likely shutdown() related.
+ return;
+ }
+ reportError("Got error while inserting row " + currentRowKey, e);
+ return;
+ }
+ }
+ }
+
+ private boolean hasRowErrorAndReport(OperationResponse resp) {
+ if (resp != null && resp.hasRowError()) {
+ reportError("The following RPC " + resp.getOperation().getRow() +
+ " returned this error: " + resp.getRowError(), null);
+ return true;
+ }
+ return false;
+ }
+ }
+
+ /**
+ * Thread that scans the table. Alternates randomly between random gets and full table scans.
+ */
+ class ScannerThread implements Runnable {
+
+ private final Random random = new Random();
+
+ // Updated by calling a full scan.
+ private int lastRowCount = 0;
+
+ @Override
+ public void run() {
+ while (KEEP_RUNNING_LATCH.getCount() > 0) {
+
+ boolean shouldContinue;
+
+ // Always scan until we find rows.
+ if (lastRowCount == 0 || random.nextBoolean()) {
+ shouldContinue = fullScan();
+ } else {
+ shouldContinue = randomGet();
+ }
+
+ if (!shouldContinue) {
+ return;
+ }
+
+ if (lastRowCount == 0) {
+ try {
+ KEEP_RUNNING_LATCH.await(50, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ // Test is stopping.
+ return;
+ }
+ }
+ }
+ }
+
+ /**
+ * Reads a row at random that it knows to exist (smaller than lastRowCount).
+ * @return
+ */
+ private boolean randomGet() {
+ int key = random.nextInt(lastRowCount);
+ KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
+ table.getSchema().getColumnByIndex(0), KuduPredicate.ComparisonOp.EQUAL, key);
+ KuduScanner scanner = localClient.newScannerBuilder(table).addPredicate(predicate).build();
+
+ List<RowResult> results = new ArrayList<>();
+ while (scanner.hasMoreRows()) {
+ try {
+ RowResultIterator ite = scanner.nextRows();
+ for (RowResult row : ite) {
+ results.add(row);
+ }
+ } catch (Exception e) {
+ return checkAndReportError("Got error while getting row " + key, e);
+ }
+ }
+
+ if (results.isEmpty() || results.size() > 1) {
+ reportError("Random get got 0 or many rows " + results.size() + " for key " + key, null);
+ return false;
+ }
+
+ int receivedKey = results.get(0).getInt(0);
+ if (receivedKey != key) {
+ reportError("Tried to get key " + key + " and received " + receivedKey, null);
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Rusn a full table scan and updates the lastRowCount.
+ * @return
+ */
+ private boolean fullScan() {
+ AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+ try {
+ int rowCount = countRowsInScan(scannerBuilder);
+ if (rowCount < lastRowCount) {
+ reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null);
+ return false;
+ }
+ lastRowCount = rowCount;
+ LOG.info("New row count {}", lastRowCount);
+ } catch (Exception e) {
+ checkAndReportError("Got error while row counting", e);
+ }
+ return true;
+ }
+
+ /**
+ * Checks the passed exception contains "Scanner not found". If it does then it returns true,
+ * else it reports the error and returns false.
+ * We need to do this because the scans in this client aren't fault tolerant.
+ * @param message message to print if the exception contains a real error
+ * @param e the exception to check
+ * @return true if the scanner failed because it wasn't false, otherwise false
+ */
+ private boolean checkAndReportError(String message, Exception e) {
+ if (!e.getCause().getMessage().contains("Scanner not found")) {
+ reportError(message, e);
+ return false;
+ }
+ return true;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/c4180bb0/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
index 7b4c932..ff0097b 100644
--- a/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
+++ b/java/kudu-client/src/test/java/org/kududb/client/TestAsyncKuduClient.java
@@ -17,6 +17,7 @@
package org.kududb.client;
import com.google.common.base.Charsets;
+import com.google.common.base.Stopwatch;
import com.google.protobuf.ByteString;
import com.stumbleupon.async.Deferred;
import org.junit.BeforeClass;
@@ -48,7 +49,7 @@ public class TestAsyncKuduClient extends BaseKuduTest {
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
// 2. Disconnect the TabletClient.
- client.getTableClients().get(0).shutdown().join(DEFAULT_SLEEP);
+ disconnectAndWait();
// 3. Count again, it will trigger a re-connection and we should not hang or fail to scan.
assertEquals(0, countRowsInScan(client.newScannerBuilder(table).build()));
@@ -76,11 +77,26 @@ public class TestAsyncKuduClient extends BaseKuduTest {
rowCount, numRows);
// 4. Disconnect the TS.
- client.getTableClients().get(0).shutdown().join(DEFAULT_SLEEP);
+ disconnectAndWait();
+
// 5. Make sure that we can continue scanning and that we get the remaining rows back.
assertEquals(rowCount - numRows, countRowsInScan(scanner));
}
+ private void disconnectAndWait() throws InterruptedException {
+ client.getTableClients().get(0).disconnect();
+ Stopwatch sw = new Stopwatch().start();
+ while (sw.elapsedMillis() < DEFAULT_SLEEP) {
+ if (!client.getTableClients().isEmpty()) {
+ Thread.sleep(50);
+ continue;
+ } else {
+ break;
+ }
+ }
+ assertTrue(client.getTableClients().isEmpty());
+ }
+
@Test
public void testBadHostnames() throws Exception {
String badHostname = "some-unknown-host-hopefully";