You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/07 21:41:17 UTC

[kudu] 01/03: [java] Support table rename between scan token creation and rehydration

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f88fc0cb7a0d6c22f36dac613044ea50e0b9255b
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Wed Jun 5 11:07:56 2019 -0700

    [java] Support table rename between scan token creation and rehydration
    
    Previously, if a scan token was created against table 'foo', and table
    'foo' was renamed to table 'bar', rehydrating the scan token into a
    scanner would either
    - fail if there is no longer a table 'foo', or
    - attempt to scan another table if that table had been renamed to 'foo'.
    This patch alters how the Java client manages scan tokens. It prefers to
    use the table id to identify a table, rather than a table name. This
    eliminates the above two problems. The table id has been added to the
    scan token PB to enable this.
    
    A follow-up will add support for the same in the C++ client.
    
    Change-Id: Iba5b0ac89b9cf98f4cf37c35d740eef744a85005
    Reviewed-on: http://gerrit.cloudera.org:8080/13518
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduClient.java    | 21 ++++++++++----
 .../apache/kudu/client/GetTableSchemaRequest.java  |  1 +
 .../apache/kudu/client/GetTableSchemaResponse.java | 12 ++++++++
 .../java/org/apache/kudu/client/KuduClient.java    | 12 ++++++++
 .../java/org/apache/kudu/client/KuduScanToken.java | 19 ++++++++++---
 .../java/org/apache/kudu/client/TestScanToken.java | 33 ++++++++++++++++++++--
 src/kudu/client/client.proto                       |  5 +++-
 7 files changed, 90 insertions(+), 13 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
index 51d65f9..de5ae10 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduClient.java
@@ -751,18 +751,18 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
-   * Gets a table's schema either by ID or by name. Note: the name must be
-   * provided, even if the RPC should be sent by ID.
+   * Gets a table's schema by ID or by name. If both are provided, table id is preferred.
+   *
    * @param tableName name of table
    * @param tableId immutable ID of table
    * @param parent parent RPC (for tracing), if any
    * @return a deferred object that yields the schema
    */
   private Deferred<KuduTable> getTableSchema(
-      @Nonnull final String tableName,
+      @Nullable final String tableName,
       @Nullable String tableId,
       @Nullable KuduRpc<?> parent) {
-    Preconditions.checkNotNull(tableName);
+    Preconditions.checkArgument(tableId != null || tableName != null);
 
     // Prefer a lookup by table ID over name, since the former is immutable.
     // For backwards compatibility with older tservers, we don't require authz
@@ -792,7 +792,7 @@ public class AsyncKuduClient implements AutoCloseable {
 
         LOG.debug("Opened table {}", resp.getTableId());
         return new KuduTable(AsyncKuduClient.this,
-            tableName,
+            resp.getTableName(),
             resp.getTableId(),
             resp.getSchema(),
             resp.getPartitionSchema(),
@@ -856,6 +856,17 @@ public class AsyncKuduClient implements AutoCloseable {
   }
 
   /**
+   * Open the table with the given id.
+   *
+   * @param id the id of the table to open
+   * @return a deferred KuduTable
+   */
+  Deferred<KuduTable> openTableById(String id) {
+    checkIsClosed();
+    return getTableSchema(null, id, null);
+  }
+
+  /**
    * Open the table with the given name.
    *
    * New range partitions created by other clients will immediately be available
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
index ae96f95..0a7f0f2 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaRequest.java
@@ -96,6 +96,7 @@ public class GetTableSchemaRequest extends KuduRpc<GetTableSchemaResponse> {
         tsUUID,
         schema,
         respBuilder.getTableId().toStringUtf8(),
+        respBuilder.getTableName(),
         respBuilder.getNumReplicas(),
         ProtobufHelper.pbToPartitionSchema(respBuilder.getPartitionSchema(), schema),
         respBuilder.hasAuthzToken() ? respBuilder.getAuthzToken() : null);
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
index b018e0a..a7ff1f1 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/GetTableSchemaResponse.java
@@ -28,6 +28,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   private final Schema schema;
   private final PartitionSchema partitionSchema;
   private final String tableId;
+  private final String tableName;
   private final int numReplicas;
   private final SignedTokenPB authzToken;
 
@@ -36,6 +37,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
    * @param tsUUID the UUID of the tablet server that sent the response
    * @param schema the table's schema
    * @param tableId the UUID of the table in the response
+   * @param tableName the name of the table in the response
    * @param numReplicas the table's replication factor
    * @param partitionSchema the table's partition schema
    * @param authzToken an authorization token for use with this table
@@ -44,6 +46,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
                          String tsUUID,
                          Schema schema,
                          String tableId,
+                         String tableName,
                          int numReplicas,
                          PartitionSchema partitionSchema,
                          SignedTokenPB authzToken) {
@@ -51,6 +54,7 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
     this.schema = schema;
     this.partitionSchema = partitionSchema;
     this.tableId = tableId;
+    this.tableName = tableName;
     this.numReplicas = numReplicas;
     this.authzToken = authzToken;
   }
@@ -80,6 +84,14 @@ public class GetTableSchemaResponse extends KuduRpcResponse {
   }
 
   /**
+   * Get the table's name.
+   * @return the table's name
+   */
+  public String getTableName() {
+    return tableName;
+  }
+
+  /**
    * Get the table's replication factor.
    * @return the table's replication factor
    */
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
index 1ca2bbe..24a6abb 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduClient.java
@@ -231,6 +231,18 @@ public class KuduClient implements AutoCloseable {
   }
 
   /**
+   * Open the table with the given id.
+   *
+   * @param id the id of the table to open
+   * @return a KuduTable if the table exists
+   * @throws KuduException if anything went wrong
+   */
+  KuduTable openTableById(final String id) throws KuduException {
+    Deferred<KuduTable> d = asyncClient.openTableById(id);
+    return joinAndHandleException(d);
+  }
+
+  /**
    * Open the table with the given name.
    *
    * New range partitions created by other clients will immediately be available
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index 758d86e..adabf26 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -125,10 +125,15 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
    */
   public static String stringifySerializedToken(byte[] buf, KuduClient client) throws IOException {
     ScanTokenPB token = ScanTokenPB.parseFrom(CodedInputStream.newInstance(buf));
-    KuduTable table = client.openTable(token.getTableName());
+    KuduTable table = token.hasTableId() ? client.openTableById(token.getTableId()) :
+                                           client.openTable(token.getTableName());
 
     MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper("ScanToken")
-                                                   .add("table", token.getTableName());
+                                                   .add("table-name", token.getTableName());
+
+    if (token.hasTableId()) {
+      helper.add("table-id", token.getTableId());
+    }
 
     if (token.hasLowerBoundPrimaryKey() && !token.getLowerBoundPrimaryKey().isEmpty()) {
       helper.add("lower-bound-primary-key",
@@ -160,7 +165,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
         !message.getFeatureFlagsList().contains(ScanTokenPB.Feature.Unknown),
         "Scan token requires an unsupported feature. This Kudu client must be updated.");
 
-    KuduTable table = client.openTable(message.getTableName());
+    KuduTable table = message.hasTableId() ? client.openTableById(message.getTableId()) :
+                                             client.openTable(message.getTableName());
     KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
 
     List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
@@ -274,7 +280,11 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
 
   @Override
   public int compareTo(KuduScanToken other) {
-    if (!message.getTableName().equals(other.message.getTableName())) {
+    if (message.hasTableId() && other.message.hasTableId()) {
+      if (!message.getTableId().equals(other.message.getTableId())) {
+        throw new IllegalArgumentException("Scan tokens from different tables may not be compared");
+      }
+    } else if (!message.getTableName().equals(other.message.getTableName())) {
       throw new IllegalArgumentException("Scan tokens from different tables may not be compared");
     }
 
@@ -340,6 +350,7 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
 
       Client.ScanTokenPB.Builder proto = Client.ScanTokenPB.newBuilder();
 
+      proto.setTableId(table.getTableId());
       proto.setTableName(table.getName());
 
       // Map the column names or indices to actual columns in the table schema.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 87a9086..da5758e 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -31,10 +31,13 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.kudu.test.ClientTestUtil.countRowsInScan;
 import static org.apache.kudu.test.ClientTestUtil.countScanTokenRows;
 import static org.apache.kudu.test.ClientTestUtil.createDefaultTable;
 import static org.apache.kudu.test.ClientTestUtil.createManyStringsSchema;
+import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.apache.kudu.test.ClientTestUtil.loadDefaultTable;
+import static org.apache.kudu.test.ClientTestUtil.scanTableToStrings;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -95,7 +98,7 @@ public class TestScanToken {
 
       KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
       tokenBuilder.batchSizeBytes(0);
-      tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+      tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
       List<KuduScanToken> tokens = tokenBuilder.build();
       assertEquals(16, tokens.size());
 
@@ -165,7 +168,7 @@ public class TestScanToken {
     session.flush();
 
     KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
-    tokenBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
     List<KuduScanToken> tokens = tokenBuilder.build();
     assertEquals(6, tokens.size());
     assertEquals('f' - 'a' + 'z' - 'h',
@@ -190,7 +193,7 @@ public class TestScanToken {
         new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64).nullable(false).key(false).build()
     ));
     CreateTableOptions createOptions = new CreateTableOptions();
-    createOptions.setRangePartitionColumns(ImmutableList.<String>of());
+    createOptions.setRangePartitionColumns(ImmutableList.of());
     createOptions.setNumReplicas(1);
     client.createTable(testTableName, schema, createOptions);
 
@@ -247,6 +250,30 @@ public class TestScanToken {
     token.intoScanner(client);
   }
 
+  /**
+   * Tests that it is possible to rehydrate a scan token after a table rename.
+   */
+  @Test
+  public void testScanTokensWithTableRename() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(1, tokens.size());
+    KuduScanToken token = tokens.get(0);
+
+    // Rename the table.
+    client.alterTable(
+        testTableName,
+        new AlterTableOptions().renameTable(testTableName + "-renamed"));
+
+    assertEquals(0, countRowsInScan(token.intoScanner(client)));
+  }
+
   /** Test that scanRequestTimeout makes it from the scan token to the underlying Scanner class. */
   @Test
   public void testScanRequestTimeout() throws IOException {
diff --git a/src/kudu/client/client.proto b/src/kudu/client/client.proto
index 988a471..4db50d8 100644
--- a/src/kudu/client/client.proto
+++ b/src/kudu/client/client.proto
@@ -43,7 +43,10 @@ message ScanTokenPB {
   // The feature set used by this scan token.
   repeated Feature feature_flags = 1;
 
-  // The table to scan.
+  // The table to scan. To remain backwards compatible, clients should set
+  // both the table name and the table id, but should prefer to use the id
+  // when rehydrating the scanner because the id cannot change.
+  optional string table_id = 20;
   optional string table_name = 2;
 
   // Which columns to select.