You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/11 16:18:50 UTC

[kudu] branch branch-1.16.x updated: [java] KUDU-3349 Fix the failure to demote a leader

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch branch-1.16.x
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/branch-1.16.x by this push:
     new 5428ea8  [java] KUDU-3349 Fix the failure to demote a leader
5428ea8 is described below

commit 5428ea81612a253266e779c903ec49f98febbb07
Author: Hongjiang Zhang <ho...@ebay.com>
AuthorDate: Fri Jan 21 15:27:58 2022 +0800

    [java] KUDU-3349 Fix the failure to demote a leader
    
    KuduScanToken gets a wrong tserver's uuid whose format is something
    like: '<ByteString@6dffd497 size=32 contents="fc07f681d3ea4bab9bc5ec8090ab9437">',
    the expected uuid should be "fc07f681d3ea4bab9bc5ec8090ab9437".
    
    This issue caused RemoteTablet to fail to demote a leader, and the java
    client always sends write ops to the demoted leader. As a result, there
    are a lot of "PendingErrors overflowed. Failed to write at least 1000 rows to Kudu".
    
    After this fix, the write ops, especially the deleting, will be faster.
    
    Change-Id: I2974b6ec2cec2f0120b113d1bcf89fe3793a1ec5
    Reviewed-on: http://gerrit.cloudera.org:8080/18166
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
    (cherry picked from commit 90895ce76590f10730ad7aac3613b69d89ff5422)
    Reviewed-on: http://gerrit.cloudera.org:8080/18222
    Tested-by: Alexey Serbin <as...@cloudera.com>
    Reviewed-by: Attila Bukor <ab...@apache.org>
---
 .../java/org/apache/kudu/client/KuduScanToken.java |  64 ++++++----
 .../java/org/apache/kudu/client/TestScanToken.java | 132 +++++++++++++++++++++
 2 files changed, 172 insertions(+), 24 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index f864f98..e864089 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -205,6 +205,44 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
     return columns;
   }
 
+  /**
+   * create a new RemoteTablet from TabletMetadata
+   * @param tabletMetadata the tablet metadata
+   * @param tableId the table Id
+   * @param partition the partition
+   * @return a RemoteTablet object
+   */
+  public static RemoteTablet newRemoteTabletFromTabletMetadata(
+      Client.TabletMetadataPB tabletMetadata,
+      String tableId,
+      Partition partition) {
+    List<LocatedTablet.Replica> replicas = new ArrayList<>();
+    for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
+        tabletMetadata.getReplicasList()) {
+      Client.ServerMetadataPB server =
+          tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
+      LocatedTablet.Replica replica = new LocatedTablet.Replica(
+          server.getRpcAddresses(0).getHost(),
+          server.getRpcAddresses(0).getPort(),
+          replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
+      replicas.add(replica);
+    }
+
+    List<ServerInfo> servers = new ArrayList<>();
+    for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
+      HostAndPort hostPort =
+          ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
+      final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+      ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toStringUtf8(),
+          hostPort, inetAddress, serverMetadataPB.getLocation());
+      servers.add(serverInfo);
+    }
+
+    RemoteTablet remoteTablet = new RemoteTablet(tableId,
+        tabletMetadata.getTabletId(), partition, replicas, servers);
+    return remoteTablet;
+  }
+
   @SuppressWarnings("deprecation")
   private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder(
       ScanTokenPB message, KuduClient client) throws KuduException {
@@ -226,30 +264,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
         TableLocationsCache tableLocationsCache =
             client.asyncClient.getOrCreateTableLocationsCache(table.getTableId());
 
-        List<LocatedTablet.Replica> replicas = new ArrayList<>();
-        for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
-            tabletMetadata.getReplicasList()) {
-          Client.ServerMetadataPB server =
-              tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
-          LocatedTablet.Replica replica = new LocatedTablet.Replica(
-              server.getRpcAddresses(0).getHost(),
-              server.getRpcAddresses(0).getPort(),
-              replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
-          replicas.add(replica);
-        }
-
-        List<ServerInfo> servers = new ArrayList<>();
-        for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
-          HostAndPort hostPort =
-              ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
-          final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
-          ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toString(),
-              hostPort, inetAddress, serverMetadataPB.getLocation());
-          servers.add(serverInfo);
-        }
-
-        RemoteTablet remoteTablet = new RemoteTablet(table.getTableId(),
-            tabletMetadata.getTabletId(), partition, replicas, servers);
+        RemoteTablet remoteTablet =
+            newRemoteTabletFromTabletMetadata(tabletMetadata, table.getTableId(), partition);
 
         tableLocationsCache.cacheTabletLocations(Collections.singletonList(remoteTablet),
             partition.partitionKeyStart, 1, tabletMetadata.getTtlMillis());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 87e3128..3ec8f17 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -41,6 +42,8 @@ import java.util.Set;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -51,6 +54,7 @@ import org.apache.kudu.ColumnSchema;
 import org.apache.kudu.Schema;
 import org.apache.kudu.Type;
 import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.KuduBinaryLocator;
 import org.apache.kudu.test.cluster.MiniKuduCluster;
 
 public class TestScanToken {
@@ -138,6 +142,104 @@ public class TestScanToken {
   }
 
   /**
+   * Regression test for KUDU-3349
+   */
+  @Test
+  public void testScanTokenWithWrongUuidSerialization() throws Exception {
+    // Prepare the table for testing.
+    Schema schema = createManyStringsSchema();
+    CreateTableOptions createOptions = new CreateTableOptions();
+    final int buckets = 8;
+    createOptions.addHashPartitions(ImmutableList.of("key"), buckets);
+    client.createTable(testTableName, schema, createOptions);
+
+    KuduSession session = client.newSession();
+    KuduTable table = client.openTable(testTableName);
+    final int totalRows = 100;
+    for (int i = 0; i < totalRows; i++) {
+      Insert insert = table.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      row.addString("c1", "c1_" + i);
+      row.addString("c2", "c2_" + i);
+      assertEquals(session.apply(insert).hasRowError(), false);
+    }
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+    tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertEquals(buckets, tokens.size());
+
+    // Create a new client, open the newly created kudu table, and new scanners.
+    AsyncKuduClient newAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+        harness.getMasterAddressesAsString())
+        .build();
+    KuduClient newClient = newAsyncClient.syncClient();
+    KuduTable newTable = newClient.openTable(testTableName);
+    List<KuduScanner> kuduScanners = new ArrayList<>(buckets);
+    List<String> tabletIds = new ArrayList<>(buckets);
+    for (KuduScanToken token : tokens) {
+      tabletIds.add(new String(token.getTablet().getTabletId(),
+          java.nio.charset.StandardCharsets.UTF_8));
+      KuduScanner kuduScanner = token.intoScanner(newAsyncClient.syncClient());
+      kuduScanners.add(kuduScanner);
+    }
+
+    // Step down all tablet leaders.
+    KuduBinaryLocator.ExecutableInfo exeInfo = null;
+    try {
+      exeInfo = KuduBinaryLocator.findBinary("kudu");
+    } catch (FileNotFoundException e) {
+      LOG.error(e.getMessage());
+      fail();
+    }
+    for (String tabletId : tabletIds) {
+      List<String> commandLine = Lists.newArrayList(exeInfo.exePath(),
+              "tablet",
+              "leader_step_down",
+              harness.getMasterAddressesAsString(),
+              tabletId);
+      ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+      processBuilder.environment().putAll(exeInfo.environment());
+      // Step down the tablet leaders one by one after a fix duration.
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        LOG.error(e.getMessage());
+      }
+    }
+    // Delete all rows first through the new client.
+    KuduSession newSession = newClient.newSession();
+
+    for (int i = 0; i < totalRows; i++) {
+      Operation del = newTable.newDelete();
+      PartialRow row = del.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      del.setRow(row);
+      OperationResponse response = newSession.apply(del);
+      assertEquals(response.hasRowError(), false);
+    }
+
+    // Insert all rows again through the new client.
+    for (int i = 0; i < totalRows; i++) {
+      Insert insert = newTable.newInsert();
+      PartialRow row = insert.getRow();
+      row.addString("key", String.format("key_%02d", i));
+      row.addString("c1", "c1_" + i);
+      row.addString("c2", "c2_" + i);
+      assertEquals(newSession.apply(insert).hasRowError(), false);
+    }
+
+    // Verify all the row count.
+    int rowCount = 0;
+    for (KuduScanner kuduScanner : kuduScanners) {
+      while (kuduScanner.hasMoreRows()) {
+        rowCount += kuduScanner.nextRows().numRows;
+      }
+    }
+    assertEquals(totalRows, rowCount);
+  }
+
+  /**
    * Tests scan token creation and execution on a table with non-covering range partitions.
    */
   @Test
@@ -695,6 +797,36 @@ public class TestScanToken {
   }
 
   /**
+   * Verify the deserialization of RemoteTablet from KuduScanToken.
+   * Regression test for KUDU-3349.
+   */
+  @Test
+  public void testRemoteTabletVerification() throws IOException {
+    final int NUM_ROWS_DESIRED = 100;
+    KuduTable table = createDefaultTable(client, testTableName);
+    loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED);
+    KuduScanToken.KuduScanTokenBuilder builder =
+            new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+    List<KuduScanToken> tokens = builder.build();
+    List<HostAndPort> tservers = harness.getTabletServers();
+    for (KuduScanToken token : tokens) {
+      byte[] serialized = token.serialize();
+      Client.ScanTokenPB scanTokenPB =
+          Client.ScanTokenPB.parseFrom(CodedInputStream.newInstance(serialized));
+      Client.TabletMetadataPB tabletMetadata = scanTokenPB.getTabletMetadata();
+      Partition partition =
+          ProtobufHelper.pbToPartition(tabletMetadata.getPartition());
+      RemoteTablet remoteTablet = KuduScanToken.newRemoteTabletFromTabletMetadata(tabletMetadata,
+          table.getTableId(), partition);
+      for (ServerInfo si : remoteTablet.getTabletServersCopy()) {
+        assertEquals(si.getUuid().length(), 32);
+        HostAndPort hostAndPort = si.getHostAndPort();
+        assertEquals(tservers.contains(hostAndPort), true);
+      }
+    }
+  }
+
+  /**
    * Regression test for KUDU-3205.
    */
   @Test