You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2018/09/15 22:25:13 UTC
kudu git commit: KUDU-2095: [java] Add scanner keepAlive API to the
Java client
Repository: kudu
Updated Branches:
refs/heads/master 72a77bfbf -> 42db87b0b
KUDU-2095: [java] Add scanner keepAlive API to the Java client
This patch adds keepAlive methods to the
AsyncKuduScanner and KuduScanner. These methods
leverage a package private method added to the
AsyncKuduClient using a similar implementation
pattern to existing scan related RPCs. The behavior of
this implementation mimics the C++ client.
Change-Id: Ic802f556c8860cdd43ef5f794c8f3658259bd0be
Reviewed-on: http://gerrit.cloudera.org:8080/11436
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Grant Henke <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/42db87b0
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/42db87b0
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/42db87b0
Branch: refs/heads/master
Commit: 42db87b0b128c573b96e39615e7fa41227fea368
Parents: 72a77bf
Author: Grant Henke <gr...@apache.org>
Authored: Thu Sep 13 14:05:55 2018 -0500
Committer: Grant Henke <gr...@apache.org>
Committed: Sat Sep 15 22:12:37 2018 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AsyncKuduClient.java | 30 ++-
.../apache/kudu/client/AsyncKuduScanner.java | 88 ++++++-
.../org/apache/kudu/client/KuduScanner.java | 25 +-
.../org/apache/kudu/client/TestKuduClient.java | 233 +++++++++++++++----
.../apache/kudu/client/TestRemoteTablet.java | 21 ++
src/kudu/client/client.h | 13 +-
6 files changed, 347 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 8c1e032..62425a4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -1004,7 +1004,7 @@ public class AsyncKuduClient implements AutoCloseable {
/**
* Package-private access point for {@link AsyncKuduScanner}s to close themselves.
- * @param scanner the scanner to close
+ * @param scanner the scanner to close.
* @return a deferred object that indicates the completion of the request.
* The {@link AsyncKuduScanner.Response} can contain rows that were left to scan.
*/
@@ -1028,6 +1028,34 @@ public class AsyncKuduClient implements AutoCloseable {
}
/**
+ * Package-private access point for {@link AsyncKuduScanner}s to keep themselves
+ * alive on tablet servers.
+ * @param scanner the scanner to keep alive.
+ * @return a deferred object that indicates the completion of the request.
+ */
+ Deferred<Void> keepAlive(final AsyncKuduScanner scanner) {
+ checkIsClosed();
+ final RemoteTablet tablet = scanner.currentTablet();
+ // Getting a null tablet here without being in a closed state means we were in between tablets.
+ // If there is no scanner to keep alive, we still return Status.OK().
+ if (tablet == null) {
+ return Deferred.fromResult(null);
+ }
+
+ final KuduRpc<Void> keepAliveRequest = scanner.getKeepAliveRequest();
+ final ServerInfo info = tablet.getReplicaSelectedServerInfo(keepAliveRequest.getReplicaSelection());
+ if (info == null) {
+ return Deferred.fromResult(null);
+ }
+
+ final Deferred<Void> d = keepAliveRequest.getDeferred();
+ keepAliveRequest.attempt++;
+ RpcProxy.sendRpc(this, connectionCache.getConnection(
+ info, Connection.CredentialsPolicy.ANY_CREDENTIALS), keepAliveRequest);
+ return d;
+ }
+
+ /**
* Sends the provided {@link KuduRpc} to the tablet server hosting the leader
* of the tablet identified by the RPC's table and partition key.
*
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index dd61bf4..804978e 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -43,6 +43,8 @@ import com.google.protobuf.Message;
import com.google.protobuf.UnsafeByteOperations;
import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
+import org.apache.kudu.tserver.Tserver.ScannerKeepAliveRequestPB;
+import org.apache.kudu.tserver.Tserver.ScannerKeepAliveResponsePB;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
@@ -662,14 +664,6 @@ public final class AsyncKuduScanner {
}
/**
- * Sets the name of the tabletSlice that's hosting {@code this.start_key}.
- * @param tablet The tabletSlice we're currently supposed to be scanning.
- */
- void setTablet(final RemoteTablet tablet) {
- this.tablet = tablet;
- }
-
- /**
* Invalidates this scanner and makes it assume it's no longer opened.
* When a TabletServer goes away while we're scanning it, or some other type
* of access problem happens, this method should be called so that the
@@ -704,6 +698,31 @@ public final class AsyncKuduScanner {
}
/**
+ * Keep the current remote scanner alive.
+ * <p>
+ * Keep the current remote scanner alive on the Tablet server for an
+ * additional time-to-live. This is useful if the interval in between
+ * nextRows() calls is big enough that the remote scanner might be garbage
+ * collected. The scanner time-to-live can be configured on the tablet
+ * server via the --scanner_ttl_ms configuration flag and has a default
+ * of 60 seconds.
+ * <p>
+ * This does not invalidate any previously fetched results.
+ * <p>
+ * Note that an error returned by this method should not be taken as indication
+ * that the scan has failed. Subsequent calls to nextRows() might still be successful,
+ * particularly if the scanner is configured to be fault tolerant.
+ * @return A deferred object that indicates the completion of the request.
+ * @throws IllegalStateException if the scanner is already closed.
+ */
+ public Deferred<Void> keepAlive() {
+ if (closed) {
+ throw new IllegalStateException("Scanner has already been closed");
+ }
+ return client.keepAlive(this);
+ }
+
+ /**
* Returns an RPC to fetch the next rows.
*/
KuduRpc<Response> getNextRowsRequest() {
@@ -718,6 +737,14 @@ public final class AsyncKuduScanner {
}
/**
+ * Returns an RPC to keep this scanner alive on the tablet server.
+ * @return a new {@link KeepAliveRequest}
+ */
+ KuduRpc<Void> getKeepAliveRequest() {
+ return new KeepAliveRequest(table, tablet);
+ }
+
+ /**
* Throws an exception if scanning already started.
* @throws IllegalStateException if scanning already started.
*/
@@ -796,6 +823,51 @@ public final class AsyncKuduScanner {
}
/**
+ * RPC sent out to keep a scanner alive on a TabletServer.
+ */
+ final class KeepAliveRequest extends KuduRpc<Void> {
+
+ KeepAliveRequest(KuduTable table, RemoteTablet tablet) {
+ super(table);
+ setTablet(tablet);
+ this.setTimeoutMillis(scanRequestTimeout);
+ }
+
+ @Override
+ String serviceName() {
+ return TABLET_SERVER_SERVICE_NAME;
+ }
+
+ @Override
+ String method() {
+ return "ScannerKeepAlive";
+ }
+
+ @Override
+ ReplicaSelection getReplicaSelection() {
+ return replicaSelection;
+ }
+
+ /** Serializes this request. */
+ @Override
+ Message createRequestPB() {
+ final ScannerKeepAliveRequestPB.Builder builder = ScannerKeepAliveRequestPB.newBuilder();
+ builder.setScannerId(UnsafeByteOperations.unsafeWrap(scannerId));
+ return builder.build();
+ }
+
+ @Override
+ Pair<Void, Object> deserialize(final CallResponse callResponse,
+ String tsUUID) throws KuduException {
+ ScannerKeepAliveResponsePB.Builder builder = ScannerKeepAliveResponsePB.newBuilder();
+ readProtobuf(callResponse.getPBMessage(), builder);
+ ScannerKeepAliveResponsePB resp = builder.build();
+ TabletServerErrorPB error = resp.hasError() ? resp.getError() : null;
+ return new Pair<Void, Object>(null, error);
+ }
+ }
+
+ /**
* RPC sent out to fetch the next rows from the TabletServer.
*/
final class ScanRequest extends KuduRpc<Response> {
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
index 13602a5..209fada 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanner.java
@@ -51,18 +51,39 @@ public class KuduScanner {
* {@code Scanner} is done scanning), calling it again leads to an undefined
* behavior.
* @return a list of rows.
- * @throws KuduException if anything went wrong
+ * @throws KuduException if anything went wrong.
*/
public RowResultIterator nextRows() throws KuduException {
return KuduClient.joinAndHandleException(asyncScanner.nextRows());
}
/**
+ * Keep the current remote scanner alive.
+ * <p>
+ * Keep the current remote scanner alive on the Tablet server for an
+ * additional time-to-live. This is useful if the interval in between
+ * nextRows() calls is big enough that the remote scanner might be garbage
+ * collected. The scanner time-to-live can be configured on the tablet
+ * server via the --scanner_ttl_ms configuration flag and has a default
+ * of 60 seconds.
+ * <p>
+ * This does not invalidate any previously fetched results.
+ * <p>
+ * Note that an exception thrown by this method should not be taken as indication
+ * that the scan has failed. Subsequent calls to nextRows() might still be successful,
+ * particularly if the scanner is configured to be fault tolerant.
+ * @throws KuduException if anything went wrong.
+ */
+ public final void keepAlive() throws KuduException {
+ KuduClient.joinAndHandleException(asyncScanner.keepAlive());
+ }
+
+ /**
* Closes this scanner (don't forget to call this when you're done with it!).
* <p>
* Closing a scanner already closed has no effect.
* @return a deferred object that indicates the completion of the request
- * @throws KuduException if anything went wrong
+ * @throws KuduException if anything went wrong.
*/
public RowResultIterator close() throws KuduException {
return KuduClient.joinAndHandleException(asyncScanner.close());
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
index 282ec03..80c0843 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduClient.java
@@ -53,20 +53,44 @@ import java.util.concurrent.Future;
import com.google.common.collect.ImmutableList;
import com.stumbleupon.async.Deferred;
+import org.apache.kudu.client.MiniKuduCluster.MiniKuduClusterBuilder;
import org.apache.kudu.util.TimestampUtil;
-import org.junit.Before;
+import org.junit.Rule;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.junit.rules.TestName;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.util.CapturingLogAppender;
import org.apache.kudu.util.DecimalUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestKuduClient extends BaseKuduTest {
- private static final String tableName = "TestKuduClient";
+ private static final Logger LOG = LoggerFactory.getLogger(TestKuduClient.class);
+
+ private static final String TABLE_NAME = "TestKuduClient";
+
+ private static final int SHORT_SCANNER_TTL_MS = 5000;
+ private static final int SHORT_SCANNER_GC_US = SHORT_SCANNER_TTL_MS * 100; // 10% of the TTL.
+
+ @Rule
+ public TestName testName = new TestName();
+
+ @Override
+ protected MiniKuduClusterBuilder getMiniClusterBuilder() {
+ MiniKuduClusterBuilder builder = super.getMiniClusterBuilder();
+ // Set a short scanner ttl for some tests.
+ if ("testKeepAlive".equals(testName.getMethodName()) ||
+ "testScannerExpiration".equals(testName.getMethodName())
+ ) {
+ LOG.info("Overriding scanner TTL and GC for testKeepAlive");
+ builder.addTserverFlag(String.format("--scanner_ttl_ms=%d", SHORT_SCANNER_TTL_MS));
+ builder.addTserverFlag(String.format("--scanner_gc_check_interval_us=%d", SHORT_SCANNER_GC_US));
+ }
+ return builder;
+ }
/**
* Test setting and reading the most recent propagated timestamp.
@@ -74,7 +98,7 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testLastPropagatedTimestamps() throws Exception {
// Scan a table to ensure a timestamp is propagated.
- KuduTable table = syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+ KuduTable table = syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
syncClient.newScannerBuilder(table).build().nextRows().getNumRows();
assertTrue(syncClient.hasLastPropagatedTimestamp());
assertTrue(client.hasLastPropagatedTimestamp());
@@ -104,22 +128,22 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testCreateDeleteTable() throws Exception {
// Check that we can create a table.
- syncClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
assertFalse(syncClient.getTablesList().getTablesList().isEmpty());
- assertTrue(syncClient.getTablesList().getTablesList().contains(tableName));
+ assertTrue(syncClient.getTablesList().getTablesList().contains(TABLE_NAME));
// Check that we can delete it.
- syncClient.deleteTable(tableName);
- assertFalse(syncClient.getTablesList().getTablesList().contains(tableName));
+ syncClient.deleteTable(TABLE_NAME);
+ assertFalse(syncClient.getTablesList().getTablesList().contains(TABLE_NAME));
// Check that we can re-recreate it, with a different schema.
List<ColumnSchema> columns = new ArrayList<>(basicSchema.getColumns());
columns.add(new ColumnSchema.ColumnSchemaBuilder("one more", Type.STRING).build());
Schema newSchema = new Schema(columns);
- syncClient.createTable(tableName, newSchema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, newSchema, getBasicCreateTableOptions());
// Check that we can open a table and see that it has the new schema.
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
assertEquals(newSchema.getColumnCount(), table.getSchema().getColumnCount());
assertTrue(table.getPartitionSchema().isSimpleRangePartitioning());
@@ -131,7 +155,6 @@ public class TestKuduClient extends BaseKuduTest {
newSchema.getColumn("column3_s").getCompressionAlgorithm());
}
-
/**
* Test creating a table with various invalid schema cases.
*/
@@ -148,7 +171,7 @@ public class TestKuduClient extends BaseKuduTest {
}
Schema schema = new Schema(cols);
try {
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
fail();
} catch (NonRecoverableException nre) {
assertThat(nre.toString(), containsString(
@@ -156,6 +179,122 @@ public class TestKuduClient extends BaseKuduTest {
}
}
+ /*
+ * Test the scanner behavior when a scanner is used beyond
+ * the scanner ttl without calling keepAlive.
+ * Note: The getMiniClusterBuilder override above depends on this method name.
+ */
+ @Test(timeout = 100000)
+ public void testScannerExpiration() throws Exception {
+ // Create a basic table and load it with data.
+ int numRows = 1000;
+ syncClient.createTable(
+ TABLE_NAME,
+ basicSchema,
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+ KuduSession session = syncClient.newSession();
+ KuduTable table = syncClient.openTable(TABLE_NAME);
+
+ for (int i = 0; i < numRows; i++) {
+ Insert insert = createBasicSchemaInsert(table, i);
+ session.apply(insert);
+ }
+
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table)
+ .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+ .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times.
+ .build();
+
+ // Initialize the scanner and verify we can read rows.
+ int rows = scanner.nextRows().getNumRows();
+ assertTrue("Scanner did not read any rows", rows > 0);
+
+ // Wait for the scanner to time out.
+ Thread.sleep(SHORT_SCANNER_TTL_MS * 2);
+
+ try {
+ scanner.nextRows();
+ fail("Exception was not thrown when accessing an expired scanner");
+ } catch (NonRecoverableException ex) {
+ assertThat(ex.getMessage(), containsString("Scanner not found"));
+ }
+
+ // Closing an expired scanner shouldn't throw an exception.
+ scanner.close();
+ }
+
+ /*
+ * Test keeping a scanner alive beyond scanner ttl.
+ * Note: The getMiniClusterBuilder override above depends on this method name.
+ */
+ @Test(timeout = 100000)
+ public void testKeepAlive() throws Exception {
+ // Create a basic table and load it with data.
+ int numRows = 1000;
+ syncClient.createTable(
+ TABLE_NAME,
+ basicSchema,
+ new CreateTableOptions().addHashPartitions(ImmutableList.of("key"), 2));
+ KuduSession session = syncClient.newSession();
+ KuduTable table = syncClient.openTable(TABLE_NAME);
+
+ for (int i = 0; i < numRows; i++) {
+ Insert insert = createBasicSchemaInsert(table, i);
+ session.apply(insert);
+ }
+
+ KuduScanner scanner = new KuduScanner.KuduScannerBuilder(client, table)
+ .replicaSelection(ReplicaSelection.CLOSEST_REPLICA)
+ .batchSizeBytes(100) // Use a small batch size so we can call nextRows many times.
+ .build();
+
+ // KeepAlive on uninitialized scanner should be ok.
+ scanner.keepAlive();
+ // Get the first batch and initialize the scanner
+ int accum = scanner.nextRows().getNumRows();
+
+ while (scanner.hasMoreRows()) {
+ int rows = scanner.nextRows().getNumRows();
+ accum += rows;
+ // Break when we are between tablets.
+ if (scanner.currentTablet() == null) {
+ LOG.info(String.format("Between tablets after scanning %d rows", accum));
+ break;
+ }
+ // Ensure we actually end up between tablets.
+ if (accum == numRows) {
+ fail("All rows were in a single tablet.");
+ }
+ }
+
+ // In between scanners now and should be ok.
+ scanner.keepAlive();
+
+ // Initialize the next scanner or keepAlive will have no effect.
+ accum += scanner.nextRows().getNumRows();
+
+ // Wait for longer than the scanner ttl calling keepAlive throughout.
+ // Each loop sleeps 25% of the scanner ttl and we loop 10 times to ensure
+ // we extend over 2x the scanner ttl.
+ for (int i = 0; i < 10; i++) {
+ Thread.sleep(SHORT_SCANNER_TTL_MS / 4);
+ scanner.keepAlive();
+ }
+
+ // Finish out the rows.
+ while (scanner.hasMoreRows()) {
+ accum += scanner.nextRows().getNumRows();
+ }
+ assertEquals("All rows were not scanned", numRows, accum);
+
+ // At this point the scanner is closed and there is nothing to keep alive.
+ try {
+ scanner.keepAlive();
+ fail("Exception was not thrown when calling keepAlive on a closed scanner");
+ } catch (IllegalStateException ex) {
+ assertThat(ex.getMessage(), containsString("Scanner has already been closed"));
+ }
+ }
/**
* Test creating a table with columns with different combinations of NOT NULL and
@@ -187,9 +326,9 @@ public class TestKuduClient extends BaseKuduTest {
.defaultValue("def")
.build());
Schema schema = new Schema(cols);
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
// Insert various rows. '-' indicates leaving the row unset in the insert.
List<String> rows = ImmutableList.of(
@@ -244,10 +383,10 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testStrings() throws Exception {
Schema schema = createManyStringsSchema();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
for (int i = 0; i < 100; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
@@ -298,10 +437,10 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testUTF8() throws Exception {
Schema schema = createManyStringsSchema();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
row.addString("key", "กขฃคฅฆง"); // some thai
@@ -325,12 +464,12 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testBinaryColumns() throws Exception {
Schema schema = createSchemaWithBinaryColumns();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
byte[] testArray = new byte[] {1, 2, 3, 4, 5, 6 ,7, 8, 9};
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
for (int i = 0; i < 100; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
@@ -368,12 +507,12 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testTimestampColumns() throws Exception {
Schema schema = createSchemaWithTimestampColumns();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
List<Long> timestamps = new ArrayList<>();
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
long lastTimestamp = 0;
for (int i = 0; i < 100; i++) {
Insert insert = table.newInsert();
@@ -416,10 +555,10 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testDecimalColumns() throws Exception {
Schema schema = createSchemaWithDecimalColumns();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
KuduSession session = syncClient.newSession();
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
// Verify ColumnTypeAttributes
assertEquals(DecimalUtil.MAX_DECIMAL128_PRECISION,
@@ -455,8 +594,8 @@ public class TestKuduClient extends BaseKuduTest {
*/
@Test
public void testScanWithLimit() throws Exception {
- syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
- KuduTable table = syncClient.openTable(tableName);
+ syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+ KuduTable table = syncClient.openTable(TABLE_NAME);
KuduSession session = syncClient.newSession();
int num_rows = 100;
for (int key = 0; key < num_rows; key++) {
@@ -505,11 +644,11 @@ public class TestKuduClient extends BaseKuduTest {
@Test
public void testScanWithPredicates() throws Exception {
Schema schema = createManyStringsSchema();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
KuduSession session = syncClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
for (int i = 0; i < 100; i++) {
Insert insert = table.newInsert();
PartialRow row = insert.getRow();
@@ -628,11 +767,11 @@ public class TestKuduClient extends BaseKuduTest {
*/
@Test(timeout = 100000)
public void testScanNonCoveredTable() throws Exception {
- syncClient.createTable(tableName, basicSchema, getBasicTableOptionsWithNonCoveredRange());
+ syncClient.createTable(TABLE_NAME, basicSchema, getBasicTableOptionsWithNonCoveredRange());
KuduSession session = syncClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
for (int key = 0; key < 100; key++) {
session.apply(createBasicSchemaInsert(table, key));
@@ -657,8 +796,8 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testAutoClose() throws Exception {
try (KuduClient localClient = new KuduClient.KuduClientBuilder(masterAddresses).build()) {
- localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
- KuduTable table = localClient.openTable(tableName);
+ localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
+ KuduTable table = localClient.openTable(TABLE_NAME);
KuduSession session = localClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
@@ -666,7 +805,7 @@ public class TestKuduClient extends BaseKuduTest {
session.apply(insert);
}
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
AsyncKuduScanner scanner = new AsyncKuduScanner.AsyncKuduScannerBuilder(client, table).build();
assertEquals(1, countRowsInScan(scanner));
}
@@ -730,7 +869,7 @@ public class TestKuduClient extends BaseKuduTest {
.build();
long buildTime = (System.nanoTime() - startTime) / 1000000000L;
assertTrue("Building KuduClient is slow, maybe netty get stuck", buildTime < 3);
- localClient.createTable(tableName, basicSchema, getBasicCreateTableOptions());
+ localClient.createTable(TABLE_NAME, basicSchema, getBasicCreateTableOptions());
Thread[] threads = new Thread[4];
for (int t = 0; t < 4; t++) {
final int id = t;
@@ -738,7 +877,7 @@ public class TestKuduClient extends BaseKuduTest {
@Override
public void run() {
try {
- KuduTable table = localClient.openTable(tableName);
+ KuduTable table = localClient.openTable(TABLE_NAME);
KuduSession session = localClient.newSession();
session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC);
for (int i = 0; i < 100; i++) {
@@ -761,7 +900,7 @@ public class TestKuduClient extends BaseKuduTest {
@Test(expected=IllegalArgumentException.class)
public void testNoDefaultPartitioning() throws Exception {
- syncClient.createTable(tableName, basicSchema, new CreateTableOptions());
+ syncClient.createTable(TABLE_NAME, basicSchema, new CreateTableOptions());
}
@Test(timeout = 100000)
@@ -773,8 +912,8 @@ public class TestKuduClient extends BaseKuduTest {
upper.addInt("key", 1);
options.addRangePartition(lower, upper);
- syncClient.createTable(tableName, basicSchema, options);
- KuduTable table = syncClient.openTable(tableName);
+ syncClient.createTable(TABLE_NAME, basicSchema, options);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
// Count the number of tablets.
KuduScanToken.KuduScanTokenBuilder tokenBuilder = syncClient.newScanTokenBuilder(table);
@@ -791,7 +930,7 @@ public class TestKuduClient extends BaseKuduTest {
upper = basicSchema.newPartialRow();
lower.addInt("key", 1);
alter.addRangePartition(lower, upper);
- alterClient.alterTable(tableName, alter);
+ alterClient.alterTable(TABLE_NAME, alter);
}
// Count the number of tablets. The result should still be the same, since
@@ -801,7 +940,7 @@ public class TestKuduClient extends BaseKuduTest {
assertEquals(1, tokens.size());
// Reopen the table and count the tablets again. The new tablet should now show up.
- table = syncClient.openTable(tableName);
+ table = syncClient.openTable(TABLE_NAME);
tokenBuilder = syncClient.newScanTokenBuilder(table);
tokens = tokenBuilder.build();
assertEquals(2, tokens.size());
@@ -810,7 +949,7 @@ public class TestKuduClient extends BaseKuduTest {
@Test(timeout = 100000)
public void testCreateTableWithConcurrentInsert() throws Exception {
KuduTable table = syncClient.createTable(
- tableName, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false));
+ TABLE_NAME, createManyStringsSchema(), getBasicCreateTableOptions().setWait(false));
// Insert a row.
//
@@ -827,13 +966,13 @@ public class TestKuduClient extends BaseKuduTest {
// This won't do anything useful (i.e. if the insert succeeds, we know the
// table has been created), but it's here for additional code coverage.
- assertTrue(syncClient.isCreateTableDone(tableName));
+ assertTrue(syncClient.isCreateTableDone(TABLE_NAME));
}
@Test(timeout = 100000)
public void testCreateTableWithConcurrentAlter() throws Exception {
// Kick off an asynchronous table creation.
- Deferred<KuduTable> d = client.createTable(tableName,
+ Deferred<KuduTable> d = client.createTable(TABLE_NAME,
createManyStringsSchema(), getBasicCreateTableOptions());
// Rename the table that's being created to make sure it doesn't interfere
@@ -843,7 +982,7 @@ public class TestKuduClient extends BaseKuduTest {
// actually exists.
while (true) {
try {
- syncClient.alterTable(tableName,
+ syncClient.alterTable(TABLE_NAME,
new AlterTableOptions().renameTable("foo"));
break;
} catch (KuduException e) {
@@ -894,7 +1033,7 @@ public class TestKuduClient extends BaseKuduTest {
final ReplicaSelection replicaSelection)
throws Exception {
Schema schema = createManyStringsSchema();
- syncClient.createTable(tableName, schema, getBasicCreateTableOptions());
+ syncClient.createTable(TABLE_NAME, schema, getBasicCreateTableOptions());
final int tasksNum = 4;
List<Callable<Void>> callables = new ArrayList<>();
@@ -906,7 +1045,7 @@ public class TestKuduClient extends BaseKuduTest {
// in the given flush mode.
KuduSession session = syncClient.newSession();
session.setFlushMode(flushMode);
- KuduTable table = syncClient.openTable(tableName);
+ KuduTable table = syncClient.openTable(TABLE_NAME);
for (int i = 0; i < 3; i++) {
for (int j = 100 * i; j < 100 * (i + 1); j++) {
Insert insert = table.newInsert();
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
index 9ba6d00..c835f27 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestRemoteTablet.java
@@ -120,6 +120,27 @@ public class TestRemoteTablet {
tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid());
}
+ // AsyncKuduClient has methods like scanNextRows, keepAlive, and closeScanner that rely on
+ // RemoteTablet.getReplicaSelectedServerInfo to be deterministic given the same state.
+ // This ensures follow up calls are routed to the same server with the scanner open.
+ // This test ensures that remains true.
+ @Test
+ public void testGetReplicaSelectedServerInfoDeterminism() {
+ RemoteTablet tabletWithLocal = getTablet(0, 0);
+ verifyGetReplicaSelectedServerInfoDeterminism(tabletWithLocal);
+
+ RemoteTablet tabletWithRemote = getTablet(0, -1);
+ verifyGetReplicaSelectedServerInfoDeterminism(tabletWithRemote);
+ }
+
+ private void verifyGetReplicaSelectedServerInfoDeterminism(RemoteTablet tablet) {
+ String init = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+ for (int i = 0; i < 10; i++) {
+ String next = tablet.getReplicaSelectedServerInfo(ReplicaSelection.CLOSEST_REPLICA).getUuid();
+ assertEquals("getReplicaSelectedServerInfo was not deterministic", init, next);
+ }
+ }
+
@Test
public void testToString() {
RemoteTablet tablet = getTablet(0, 1);
http://git-wip-us.apache.org/repos/asf/kudu/blob/42db87b0/src/kudu/client/client.h
----------------------------------------------------------------------
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index 795aae5..e479aa9 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -1953,11 +1953,13 @@ class KUDU_EXPORT KuduScanner {
/// Keep the current remote scanner alive.
///
- /// Keep the current remote scanner alive on the Tablet server
- /// for an additional time-to-live (set by a configuration flag on
- /// the tablet server). This is useful if the interval in between
+ /// Keep the current remote scanner alive on the Tablet server for an
+ /// additional time-to-live. This is useful if the interval in between
/// NextBatch() calls is big enough that the remote scanner might be garbage
- /// collected (default TTL is set to 60 secs.).
+ /// collected. The scanner time-to-live can be configured on the tablet
+ /// server via the --scanner_ttl_ms configuration flag and has a default
+ /// of 60 seconds.
+ ///
/// This does not invalidate any previously fetched results.
///
/// @return Operation result status. In particular, this method returns
@@ -1965,7 +1967,8 @@ class KUDU_EXPORT KuduScanner {
/// TabletServer was unreachable, for any reason. Note that a non-OK
/// status returned by this method should not be taken as indication
/// that the scan has failed. Subsequent calls to NextBatch() might
- /// still be successful, particularly if SetFaultTolerant() has been called.
+ /// still be successful, particularly if the scanner is configured to be
+ /// fault tolerant.
Status KeepAlive();
/// Close the scanner.