You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by gr...@apache.org on 2019/06/14 19:51:51 UTC

[kudu] branch branch-1.10.x updated: [java] Favor column ids over column names in scan tokens

This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch branch-1.10.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.10.x by this push:
     new a110653  [java] Favor column ids over column names in scan tokens
a110653 is described below

commit a1106532113b7e9951a5b0632e7ebdc3ad19c613
Author: Will Berkeley <wd...@gmail.com>
AuthorDate: Thu Jun 6 14:24:22 2019 -0700

    [java] Favor column ids over column names in scan tokens
    
    Previously, a scan token would use column name to map a column in its
    projection to a column in the target table's current schema.
    Therefore, a scan token couldn't be used if a column were renamed
    between when the token is cut and when it is rehydrated into a scanner.
    This adjusts the Java client to prefer ids to names, to fix this
    behavior. Since this involves including column ids when serializing
    columns to PBs as part of scan tokens, but the server does not permit
    clients to send column ids in most cases, this patch adds a new
    serialization option that includes column ids.
    
    Note that this patch does not make _scanners_ resistant to column name
    changes. If a scanner is opened against a table and a column name
    changes on a replica before the scanner opens a server-side scanner on
    it, the scan will fail if the column is in the projection.
    
    A follow-up will add similar capability to the C++ client.
    
    Change-Id: Ib3f05a4175c7e7bfaec2cbd3586723e6de3823f0
    Reviewed-on: http://gerrit.cloudera.org:8080/13562
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
    (cherry picked from commit a97276f29697ca32f599053d0e0aaa6d1e521a63)
    Reviewed-on: http://gerrit.cloudera.org:8080/13646
    Reviewed-by: Grant Henke <gr...@apache.org>
---
 .../org/apache/kudu/client/AlterTableOptions.java  |   6 +-
 .../org/apache/kudu/client/CreateTableRequest.java |   6 +-
 .../java/org/apache/kudu/client/KuduScanToken.java |  70 ++++++---
 .../java/org/apache/kudu/client/Operation.java     |   3 +-
 .../org/apache/kudu/client/ProtobufHelper.java     |  32 ++--
 .../org/apache/kudu/client/TestKeyEncoding.java    |   9 +-
 .../java/org/apache/kudu/client/TestScanToken.java | 170 ++++++++++++++++++---
 7 files changed, 229 insertions(+), 67 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
index abf5538..cb92c29 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AlterTableOptions.java
@@ -297,7 +297,8 @@ public class AlterTableOptions {
     step.setAddRangePartition(builder);
     if (!pb.hasSchema()) {
       pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
-          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                     SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     }
     return this;
   }
@@ -357,7 +358,8 @@ public class AlterTableOptions {
     step.setDropRangePartition(builder);
     if (!pb.hasSchema()) {
       pb.setSchema(ProtobufHelper.schemaToPb(lowerBound.getSchema(),
-          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+          EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                     SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     }
     return this;
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
index c270f82..f5c5eb5 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/CreateTableRequest.java
@@ -18,6 +18,7 @@
 package org.apache.kudu.client;
 
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.protobuf.Message;
@@ -25,6 +26,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.jboss.netty.util.Timer;
 
 import org.apache.kudu.Schema;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.master.Master;
 import org.apache.kudu.util.Pair;
 
@@ -57,7 +59,9 @@ class CreateTableRequest extends KuduRpc<CreateTableResponse> {
   @Override
   Message createRequestPB() {
     this.builder.setName(this.name);
-    this.builder.setSchema(ProtobufHelper.schemaToPb(this.schema));
+    this.builder.setSchema(
+        ProtobufHelper.schemaToPb(this.schema,
+                                  EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     return this.builder.build();
   }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index adabf26..bfb0712 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -19,11 +19,11 @@ package org.apache.kudu.client;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.UnsafeByteOperations;
@@ -32,7 +32,9 @@ import org.apache.yetus.audience.InterfaceStability;
 
 import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Common;
+import org.apache.kudu.Schema;
 import org.apache.kudu.client.Client.ScanTokenPB;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.util.Pair;
 
 /**
@@ -159,6 +161,29 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
     return helper.toString();
   }
 
+  private static List<Integer> computeProjectedColumnIndexesForScanner(ScanTokenPB message,
+                                                                       Schema schema) {
+    List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
+    for (Common.ColumnSchemaPB colSchemaFromPb : message.getProjectedColumnsList()) {
+      int colIdx = colSchemaFromPb.hasId() && schema.hasColumnIds() ?
+          schema.getColumnIndex(colSchemaFromPb.getId()) :
+          schema.getColumnIndex(colSchemaFromPb.getName());
+      ColumnSchema colSchema = schema.getColumnByIndex(colIdx);
+      if (colSchemaFromPb.getType() != colSchema.getType().getDataType(colSchema.getTypeAttributes())) {
+        throw new IllegalStateException(String.format(
+            "invalid type %s for column '%s' in scan token, expected: %s",
+            colSchemaFromPb.getType().name(), colSchemaFromPb.getName(), colSchema.getType().name()));
+      }
+      if (colSchemaFromPb.getIsNullable() != colSchema.isNullable()) {
+        throw new IllegalStateException(String.format(
+            "invalid nullability for column '%s' in scan token, expected: %s",
+            colSchemaFromPb.getName(), colSchema.isNullable() ? "NULLABLE" : "NOT NULL"));
+      }
+      columns.add(colIdx);
+    }
+    return columns;
+  }
+
   private static KuduScanner pbIntoScanner(ScanTokenPB message,
                                            KuduClient client) throws KuduException {
     Preconditions.checkArgument(
@@ -169,25 +194,9 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
                                              client.openTable(message.getTableName());
     KuduScanner.KuduScannerBuilder builder = client.newScannerBuilder(table);
 
-    List<Integer> columns = new ArrayList<>(message.getProjectedColumnsCount());
-    for (Common.ColumnSchemaPB column : message.getProjectedColumnsList()) {
-      int columnIdx = table.getSchema().getColumnIndex(column.getName());
-      ColumnSchema schema = table.getSchema().getColumnByIndex(columnIdx);
-      if (column.getType() != schema.getType().getDataType(schema.getTypeAttributes())) {
-        throw new IllegalStateException(String.format(
-            "invalid type %s for column '%s' in scan token, expected: %s",
-            column.getType().name(), column.getName(), schema.getType().name()));
-      }
-      if (column.getIsNullable() != schema.isNullable()) {
-        throw new IllegalStateException(String.format(
-            "invalid nullability for column '%s' in scan token, expected: %s",
-            column.getName(), column.getIsNullable() ? "NULLABLE" : "NOT NULL"));
-
-      }
 
-      columns.add(columnIdx);
-    }
-    builder.setProjectedColumnIndexes(columns);
+    builder.setProjectedColumnIndexes(
+        computeProjectedColumnIndexesForScanner(message, table.getSchema()));
 
     for (Common.ColumnPredicatePB pred : message.getColumnPredicatesList()) {
       builder.addPredicate(KuduPredicate.fromPB(table.getSchema(), pred));
@@ -355,21 +364,32 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
 
       // Map the column names or indices to actual columns in the table schema.
       // If the user did not set either projection, then scan all columns.
+      Schema schema = table.getSchema();
       if (projectedColumnNames != null) {
         for (String columnName : projectedColumnNames) {
-          ColumnSchema columnSchema = table.getSchema().getColumn(columnName);
+          ColumnSchema columnSchema = schema.getColumn(columnName);
           Preconditions.checkArgument(columnSchema != null, "unknown column i%s", columnName);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ? schema.getColumnId(columnName) : -1,
+                                    columnSchema);
         }
       } else if (projectedColumnIndexes != null) {
         for (int columnIdx : projectedColumnIndexes) {
-          ColumnSchema columnSchema = table.getSchema().getColumnByIndex(columnIdx);
+          ColumnSchema columnSchema = schema.getColumnByIndex(columnIdx);
           Preconditions.checkArgument(columnSchema != null, "unknown column index %s", columnIdx);
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), columnSchema);
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ?
+                                        schema.getColumnId(columnSchema.getName()) :
+                                        -1,
+                                    columnSchema);
         }
       } else {
-        for (ColumnSchema column : table.getSchema().getColumns()) {
-          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(), column);
+        for (ColumnSchema column : schema.getColumns()) {
+          ProtobufHelper.columnToPb(proto.addProjectedColumnsBuilder(),
+                                    schema.hasColumnIds() ?
+                                        schema.getColumnId(column.getName()) :
+                                        -1,
+                                    column);
         }
       }
 
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
index bb655f0..998692b 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/Operation.java
@@ -290,7 +290,8 @@ public abstract class Operation extends KuduRpc<OperationResponse> {
 
     Tserver.WriteRequestPB.Builder requestBuilder = Tserver.WriteRequestPB.newBuilder();
     requestBuilder.setSchema(ProtobufHelper.schemaToPb(schema,
-        EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT)));
+        EnumSet.of(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_COMMENT,
+                   SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID)));
     requestBuilder.setRowOperations(rowOps);
     return requestBuilder;
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
index ed3263d..45ebe20 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/ProtobufHelper.java
@@ -46,7 +46,8 @@ public class ProtobufHelper {
    * The flags that are not included while serializing.
    */
   public enum SchemaPBConversionFlags {
-    SCHEMA_PB_WITHOUT_COMMENT;
+    SCHEMA_PB_WITHOUT_COMMENT,
+    SCHEMA_PB_WITHOUT_ID
   }
 
   /**
@@ -58,13 +59,13 @@ public class ProtobufHelper {
     return schemaToListPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
-  public static List<Common.ColumnSchemaPB> schemaToListPb(
-      Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
-    ArrayList<Common.ColumnSchemaPB> columns =
-        new ArrayList<Common.ColumnSchemaPB>(schema.getColumnCount());
+  public static List<Common.ColumnSchemaPB> schemaToListPb(Schema schema,
+                                                           EnumSet<SchemaPBConversionFlags> flags) {
+    ArrayList<Common.ColumnSchemaPB> columns = new ArrayList<>(schema.getColumnCount());
     Common.ColumnSchemaPB.Builder schemaBuilder = Common.ColumnSchemaPB.newBuilder();
     for (ColumnSchema col : schema.getColumns()) {
-      columns.add(columnToPb(schemaBuilder, col, flags));
+      int id = schema.hasColumnIds() ? schema.getColumnId(col.getName()) : -1;
+      columns.add(columnToPb(schemaBuilder, id, col, flags));
       schemaBuilder.clear();
     }
     return columns;
@@ -74,24 +75,30 @@ public class ProtobufHelper {
     return schemaToPb(schema, EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
-  public static Common.SchemaPB schemaToPb(
-      Schema schema, EnumSet<SchemaPBConversionFlags> flags) {
+  public static Common.SchemaPB schemaToPb(Schema schema,
+                                           EnumSet<SchemaPBConversionFlags> flags) {
     Common.SchemaPB.Builder builder = Common.SchemaPB.newBuilder();
     builder.addAllColumns(schemaToListPb(schema, flags));
     return builder.build();
   }
 
   public static Common.ColumnSchemaPB columnToPb(ColumnSchema column) {
-    return columnToPb(Common.ColumnSchemaPB.newBuilder(), column);
+    return columnToPb(Common.ColumnSchemaPB.newBuilder(), -1, column);
   }
 
   public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder,
+                                                 int colId,
                                                  ColumnSchema column) {
-    return columnToPb(schemaBuilder, column, EnumSet.noneOf(SchemaPBConversionFlags.class));
+    return columnToPb(schemaBuilder,
+                      colId,
+                      column,
+                      EnumSet.noneOf(SchemaPBConversionFlags.class));
   }
 
   public static Common.ColumnSchemaPB columnToPb(Common.ColumnSchemaPB.Builder schemaBuilder,
-      ColumnSchema column, EnumSet<SchemaPBConversionFlags> flags) {
+                                                 int colId,
+                                                 ColumnSchema column,
+                                                 EnumSet<SchemaPBConversionFlags> flags) {
     schemaBuilder
         .setName(column.getName())
         .setType(column.getWireType())
@@ -99,6 +106,9 @@ public class ProtobufHelper {
         .setIsNullable(column.isNullable())
         .setCfileBlockSize(column.getDesiredBlockSize());
 
+    if (!flags.contains(SchemaPBConversionFlags.SCHEMA_PB_WITHOUT_ID) && colId >= 0) {
+      schemaBuilder.setId(colId);
+    }
     if (column.getEncoding() != null) {
       schemaBuilder.setEncoding(column.getEncoding().getInternalPbType());
     }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
index 4caf6dd..2b80d4a 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKeyEncoding.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.math.BigDecimal;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 
 import com.google.common.collect.ImmutableList;
@@ -37,6 +38,7 @@ import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.client.PartitionSchema.HashBucketSchema;
 import org.apache.kudu.client.PartitionSchema.RangeSchema;
+import org.apache.kudu.client.ProtobufHelper.SchemaPBConversionFlags;
 import org.apache.kudu.util.DecimalUtil;
 
 public class TestKeyEncoding {
@@ -55,9 +57,10 @@ public class TestKeyEncoding {
     int i = 0;
     Common.SchemaPB.Builder pb = Common.SchemaPB.newBuilder();
     for (ColumnSchemaBuilder column : columns) {
-      Common.ColumnSchemaPB.Builder columnPb =
-          ProtobufHelper.columnToPb(column.build()).toBuilder();
-      columnPb.setId(i++);
+      Common.ColumnSchemaPB columnPb =
+          ProtobufHelper.columnToPb(Common.ColumnSchemaPB.newBuilder(),
+                                    i++,
+                                    column.build());
       pb.addColumns(columnPb);
     }
     return ProtobufHelper.pbToSchema(pb.build());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 1d2da18..c203e95 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -215,41 +215,64 @@ public class TestScanToken {
       assertTrue(e.getMessage().contains("Unknown column"));
     }
 
-    // Add back the column with the wrong type.
+    // Add a column with the same name, type, and nullability. It will have a different id-- it's a
+    // different column-- so the scan token will fail.
     client.alterTable(
         testTableName,
-        new AlterTableOptions().addColumn(
-            new ColumnSchema.ColumnSchemaBuilder("a", Type.STRING).nullable(true).build()));
+        new AlterTableOptions()
+            .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
+                .nullable(false)
+                .defaultValue(0L).build()));
     try {
       token.intoScanner(client);
       fail();
-    } catch (IllegalStateException e) {
+    } catch (IllegalArgumentException e) {
       assertTrue(e.getMessage().contains(
-          "invalid type INT64 for column 'a' in scan token, expected: STRING"));
+          "Unknown column"));
     }
+  }
 
-    // Add the column with the wrong nullability.
-    client.alterTable(
-        testTableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          .nullable(true).build()));
+  /**
+   * Tests that it is possible to create a scan token, rename a column, and rehydrate a scanner from
+   * the scan token with the old column name.
+   */
+  @Test
+  public void testScanTokensConcurrentColumnRename() throws Exception {
+    Schema schema = getBasicSchema();
+    String oldColName = schema.getColumnByIndex(1).getName();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    client.createTable(testTableName, schema, createOptions);
+
+    KuduTable table = client.openTable(testTableName);
+
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(1, tokens.size());
+    KuduScanToken token = tokens.get(0);
+
+    // Rename a column.
+    String newColName = "new-name";
+    client.alterTable(testTableName, new AlterTableOptions().renameColumn(oldColName, newColName));
+
+    KuduScanner scanner = token.intoScanner(client);
+
+    // TODO(wdberkeley): Handle renaming a column between when the token is rehydrated as a scanner
+    //  and when the scanner first hits a replica. Note that this is almost certainly a very
+    //  short period of vulnerability.
+
+    assertEquals(0, countRowsInScan(scanner));
+
+    // Test that the old name cannot be used and the new name can be.
+    Schema alteredSchema = scanner.getProjectionSchema();
     try {
-      token.intoScanner(client);
+      alteredSchema.getColumn(oldColName);
       fail();
-    } catch (IllegalStateException e) {
-      assertTrue(e.getMessage().contains(
-          "invalid nullability for column 'a' in scan token, expected: NOT NULL"));
+    } catch (IllegalArgumentException ex) {
+      // Good.
     }
-
-    // Add the column with the correct type and nullability.
-    client.alterTable(
-        testTableName,
-        new AlterTableOptions().dropColumn("a")
-                               .addColumn(new ColumnSchema.ColumnSchemaBuilder("a", Type.INT64)
-                                                          .nullable(false)
-                                                          .defaultValue(0L).build()));
-    token.intoScanner(client);
+    alteredSchema.getColumn(newColName);
   }
 
   /**
@@ -357,4 +380,103 @@ public class TestScanToken {
       assertEquals(SCAN_REQUEST_TIMEOUT_MS, scanner.getScanRequestTimeout());
     }
   }
+
+  // Helper for scan token tests that use diff scan.
+  private long setupTableForDiffScans(KuduClient client,
+                                      KuduTable table,
+                                      int numRows) throws Exception {
+    KuduSession session = client.newSession();
+    for (int i = 0 ; i < numRows / 2; i++) {
+      session.apply(createBasicSchemaInsert(table, i));
+    }
+
+    // Grab the timestamp, then add more data so there's a diff.
+    long timestamp = client.getLastPropagatedTimestamp();
+    for (int i = numRows / 2; i < numRows; i++) {
+      session.apply(createBasicSchemaInsert(table, i));
+    }
+    // Delete some data so the is_deleted column can be tested.
+    for (int i = 0; i < numRows / 4; i++) {
+      Delete delete = table.newDelete();
+      PartialRow row = delete.getRow();
+      row.addInt(0, i);
+      session.apply(delete);
+    }
+
+    return timestamp;
+  }
+
+  // Helper to check diff scan results.
+  private void checkDiffScanResults(KuduScanner scanner,
+                                    int numExpectedMutations,
+                                    int numExpectedDeletes) throws KuduException {
+    int numMutations = 0;
+    int numDeletes = 0;
+    while (scanner.hasMoreRows()) {
+      for (RowResult rowResult : scanner.nextRows()) {
+        numMutations++;
+        if (rowResult.isDeleted()) numDeletes++;
+      }
+    }
+    assertEquals(numExpectedMutations, numMutations);
+    assertEquals(numExpectedDeletes, numDeletes);
+  }
+
+  /** Test that scan tokens work with diff scans. */
+  @Test
+  public void testDiffScanTokens() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    // Set up the table for a diff scan.
+    int numRows = 20;
+    long timestamp = setupTableForDiffScans(client, table, numRows);
+
+    // Since the diff scan interval is [start, end), increment the start timestamp to exclude
+    // the last row inserted in the first group of ops, and increment the end timestamp to include
+    // the last row deleted in the second group of ops.
+    List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
+        .build();
+    assertEquals(1, tokens.size());
+
+    checkDiffScanResults(tokens.get(0).intoScanner(client), 3 * numRows / 4, numRows / 4);
+  }
+
+  /** Test that scan tokens work with diff scans even when columns are renamed. */
+  @Test
+  public void testDiffScanTokensConcurrentColumnRename() throws Exception {
+    Schema schema = getBasicSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    createOptions.setRangePartitionColumns(ImmutableList.of());
+    createOptions.setNumReplicas(1);
+    KuduTable table = client.createTable(testTableName, schema, createOptions);
+
+    // Set up the table for a diff scan.
+    int numRows = 20;
+    long timestamp = setupTableForDiffScans(client, table, numRows);
+
+    // Since the diff scan interval is [start, end), increment the start timestamp to exclude
+    // the last row inserted in the first group of ops, and increment the end timestamp to include
+    // the last row deleted in the second group of ops.
+    List<KuduScanToken> tokens = client.newScanTokenBuilder(table)
+        .diffScan(timestamp + 1, client.getLastPropagatedTimestamp() + 1)
+        .build();
+    assertEquals(1, tokens.size());
+
+    // Rename a column between when the token is created and when it is rehydrated into a scanner
+    client.alterTable(table.getName(),
+                      new AlterTableOptions().renameColumn("column1_i", "column1_i_new"));
+
+    KuduScanner scanner = tokens.get(0).intoScanner(client);
+
+    // TODO(wdberkeley): Handle renaming a column between when the token is rehydrated as a scanner
+    //  and when the scanner first hits a replica. Note that this is almost certainly a very
+    //  short period of vulnerability.
+
+    checkDiffScanResults(scanner, 3 * numRows / 4, numRows / 4);
+  }
 }