You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/02/11 16:18:50 UTC
[kudu] branch branch-1.16.x updated: [java] KUDU-3349 Fix the failure to demote a leader
This is an automated email from the ASF dual-hosted git repository.
alexey pushed a commit to branch branch-1.16.x
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/branch-1.16.x by this push:
new 5428ea8 [java] KUDU-3349 Fix the failure to demote a leader
5428ea8 is described below
commit 5428ea81612a253266e779c903ec49f98febbb07
Author: Hongjiang Zhang <ho...@ebay.com>
AuthorDate: Fri Jan 21 15:27:58 2022 +0800
[java] KUDU-3349 Fix the failure to demote a leader
KuduScanToken gets a wrong tserver's uuid whose format is something
like: '<ByteString@6dffd497 size=32 contents="fc07f681d3ea4bab9bc5ec8090ab9437">',
the expected uuid should be "fc07f681d3ea4bab9bc5ec8090ab9437".
This issue caused RemoteTablet to fail to demote a leader, and the java
client always sends write ops to the demoted leader. As a result, there
are a lot of "PendingErrors overflowed. Failed to write at least 1000 rows to Kudu".
After this fix, the write ops, especially the deleting, will be faster.
Change-Id: I2974b6ec2cec2f0120b113d1bcf89fe3793a1ec5
Reviewed-on: http://gerrit.cloudera.org:8080/18166
Tested-by: Kudu Jenkins
Reviewed-by: Alexey Serbin <as...@cloudera.com>
(cherry picked from commit 90895ce76590f10730ad7aac3613b69d89ff5422)
Reviewed-on: http://gerrit.cloudera.org:8080/18222
Tested-by: Alexey Serbin <as...@cloudera.com>
Reviewed-by: Attila Bukor <ab...@apache.org>
---
.../java/org/apache/kudu/client/KuduScanToken.java | 64 ++++++----
.../java/org/apache/kudu/client/TestScanToken.java | 132 +++++++++++++++++++++
2 files changed, 172 insertions(+), 24 deletions(-)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
index f864f98..e864089 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduScanToken.java
@@ -205,6 +205,44 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
return columns;
}
+ /**
+ * create a new RemoteTablet from TabletMetadata
+ * @param tabletMetadata the tablet metadata
+ * @param tableId the table Id
+ * @param partition the partition
+ * @return a RemoteTablet object
+ */
+ public static RemoteTablet newRemoteTabletFromTabletMetadata(
+ Client.TabletMetadataPB tabletMetadata,
+ String tableId,
+ Partition partition) {
+ List<LocatedTablet.Replica> replicas = new ArrayList<>();
+ for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
+ tabletMetadata.getReplicasList()) {
+ Client.ServerMetadataPB server =
+ tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
+ LocatedTablet.Replica replica = new LocatedTablet.Replica(
+ server.getRpcAddresses(0).getHost(),
+ server.getRpcAddresses(0).getPort(),
+ replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
+ replicas.add(replica);
+ }
+
+ List<ServerInfo> servers = new ArrayList<>();
+ for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
+ HostAndPort hostPort =
+ ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
+ final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
+ ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toStringUtf8(),
+ hostPort, inetAddress, serverMetadataPB.getLocation());
+ servers.add(serverInfo);
+ }
+
+ RemoteTablet remoteTablet = new RemoteTablet(tableId,
+ tabletMetadata.getTabletId(), partition, replicas, servers);
+ return remoteTablet;
+ }
+
@SuppressWarnings("deprecation")
private static KuduScanner.KuduScannerBuilder pbIntoScannerBuilder(
ScanTokenPB message, KuduClient client) throws KuduException {
@@ -226,30 +264,8 @@ public class KuduScanToken implements Comparable<KuduScanToken> {
TableLocationsCache tableLocationsCache =
client.asyncClient.getOrCreateTableLocationsCache(table.getTableId());
- List<LocatedTablet.Replica> replicas = new ArrayList<>();
- for (Client.TabletMetadataPB.ReplicaMetadataPB replicaMetadataPB :
- tabletMetadata.getReplicasList()) {
- Client.ServerMetadataPB server =
- tabletMetadata.getTabletServers(replicaMetadataPB.getTsIdx());
- LocatedTablet.Replica replica = new LocatedTablet.Replica(
- server.getRpcAddresses(0).getHost(),
- server.getRpcAddresses(0).getPort(),
- replicaMetadataPB.getRole(), replicaMetadataPB.getDimensionLabel());
- replicas.add(replica);
- }
-
- List<ServerInfo> servers = new ArrayList<>();
- for (Client.ServerMetadataPB serverMetadataPB : tabletMetadata.getTabletServersList()) {
- HostAndPort hostPort =
- ProtobufHelper.hostAndPortFromPB(serverMetadataPB.getRpcAddresses(0));
- final InetAddress inetAddress = NetUtil.getInetAddress(hostPort.getHost());
- ServerInfo serverInfo = new ServerInfo(serverMetadataPB.getUuid().toString(),
- hostPort, inetAddress, serverMetadataPB.getLocation());
- servers.add(serverInfo);
- }
-
- RemoteTablet remoteTablet = new RemoteTablet(table.getTableId(),
- tabletMetadata.getTabletId(), partition, replicas, servers);
+ RemoteTablet remoteTablet =
+ newRemoteTabletFromTabletMetadata(tabletMetadata, table.getTableId(), partition);
tableLocationsCache.cacheTabletLocations(Collections.singletonList(remoteTablet),
partition.partitionKeyStart, 1, tabletMetadata.getTtlMillis());
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
index 87e3128..3ec8f17 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestScanToken.java
@@ -32,6 +32,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -41,6 +42,8 @@ import java.util.Set;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.protobuf.CodedInputStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
@@ -51,6 +54,7 @@ import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.test.KuduTestHarness;
+import org.apache.kudu.test.cluster.KuduBinaryLocator;
import org.apache.kudu.test.cluster.MiniKuduCluster;
public class TestScanToken {
@@ -138,6 +142,104 @@ public class TestScanToken {
}
/**
+ * Regression test for KUDU-3349
+ */
+ @Test
+ public void testScanTokenWithWrongUuidSerialization() throws Exception {
+ // Prepare the table for testing.
+ Schema schema = createManyStringsSchema();
+ CreateTableOptions createOptions = new CreateTableOptions();
+ final int buckets = 8;
+ createOptions.addHashPartitions(ImmutableList.of("key"), buckets);
+ client.createTable(testTableName, schema, createOptions);
+
+ KuduSession session = client.newSession();
+ KuduTable table = client.openTable(testTableName);
+ final int totalRows = 100;
+ for (int i = 0; i < totalRows; i++) {
+ Insert insert = table.newInsert();
+ PartialRow row = insert.getRow();
+ row.addString("key", String.format("key_%02d", i));
+ row.addString("c1", "c1_" + i);
+ row.addString("c2", "c2_" + i);
+ assertEquals(session.apply(insert).hasRowError(), false);
+ }
+ KuduScanToken.KuduScanTokenBuilder tokenBuilder = client.newScanTokenBuilder(table);
+ tokenBuilder.setProjectedColumnIndexes(ImmutableList.of());
+ List<KuduScanToken> tokens = tokenBuilder.build();
+ assertEquals(buckets, tokens.size());
+
+ // Create a new client, open the newly created kudu table, and new scanners.
+ AsyncKuduClient newAsyncClient = new AsyncKuduClient.AsyncKuduClientBuilder(
+ harness.getMasterAddressesAsString())
+ .build();
+ KuduClient newClient = newAsyncClient.syncClient();
+ KuduTable newTable = newClient.openTable(testTableName);
+ List<KuduScanner> kuduScanners = new ArrayList<>(buckets);
+ List<String> tabletIds = new ArrayList<>(buckets);
+ for (KuduScanToken token : tokens) {
+ tabletIds.add(new String(token.getTablet().getTabletId(),
+ java.nio.charset.StandardCharsets.UTF_8));
+ KuduScanner kuduScanner = token.intoScanner(newAsyncClient.syncClient());
+ kuduScanners.add(kuduScanner);
+ }
+
+ // Step down all tablet leaders.
+ KuduBinaryLocator.ExecutableInfo exeInfo = null;
+ try {
+ exeInfo = KuduBinaryLocator.findBinary("kudu");
+ } catch (FileNotFoundException e) {
+ LOG.error(e.getMessage());
+ fail();
+ }
+ for (String tabletId : tabletIds) {
+ List<String> commandLine = Lists.newArrayList(exeInfo.exePath(),
+ "tablet",
+ "leader_step_down",
+ harness.getMasterAddressesAsString(),
+ tabletId);
+ ProcessBuilder processBuilder = new ProcessBuilder(commandLine);
+ processBuilder.environment().putAll(exeInfo.environment());
+ // Step down the tablet leaders one by one after a fix duration.
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ LOG.error(e.getMessage());
+ }
+ }
+ // Delete all rows first through the new client.
+ KuduSession newSession = newClient.newSession();
+
+ for (int i = 0; i < totalRows; i++) {
+ Operation del = newTable.newDelete();
+ PartialRow row = del.getRow();
+ row.addString("key", String.format("key_%02d", i));
+ del.setRow(row);
+ OperationResponse response = newSession.apply(del);
+ assertEquals(response.hasRowError(), false);
+ }
+
+ // Insert all rows again through the new client.
+ for (int i = 0; i < totalRows; i++) {
+ Insert insert = newTable.newInsert();
+ PartialRow row = insert.getRow();
+ row.addString("key", String.format("key_%02d", i));
+ row.addString("c1", "c1_" + i);
+ row.addString("c2", "c2_" + i);
+ assertEquals(newSession.apply(insert).hasRowError(), false);
+ }
+
+ // Verify all the row count.
+ int rowCount = 0;
+ for (KuduScanner kuduScanner : kuduScanners) {
+ while (kuduScanner.hasMoreRows()) {
+ rowCount += kuduScanner.nextRows().numRows;
+ }
+ }
+ assertEquals(totalRows, rowCount);
+ }
+
+ /**
* Tests scan token creation and execution on a table with non-covering range partitions.
*/
@Test
@@ -695,6 +797,36 @@ public class TestScanToken {
}
/**
+ * Verify the deserialization of RemoteTablet from KuduScanToken.
+ * Regression test for KUDU-3349.
+ */
+ @Test
+ public void testRemoteTabletVerification() throws IOException {
+ final int NUM_ROWS_DESIRED = 100;
+ KuduTable table = createDefaultTable(client, testTableName);
+ loadDefaultTable(client, testTableName, NUM_ROWS_DESIRED);
+ KuduScanToken.KuduScanTokenBuilder builder =
+ new KuduScanToken.KuduScanTokenBuilder(asyncClient, table);
+ List<KuduScanToken> tokens = builder.build();
+ List<HostAndPort> tservers = harness.getTabletServers();
+ for (KuduScanToken token : tokens) {
+ byte[] serialized = token.serialize();
+ Client.ScanTokenPB scanTokenPB =
+ Client.ScanTokenPB.parseFrom(CodedInputStream.newInstance(serialized));
+ Client.TabletMetadataPB tabletMetadata = scanTokenPB.getTabletMetadata();
+ Partition partition =
+ ProtobufHelper.pbToPartition(tabletMetadata.getPartition());
+ RemoteTablet remoteTablet = KuduScanToken.newRemoteTabletFromTabletMetadata(tabletMetadata,
+ table.getTableId(), partition);
+ for (ServerInfo si : remoteTablet.getTabletServersCopy()) {
+ assertEquals(si.getUuid().length(), 32);
+ HostAndPort hostAndPort = si.getHostAndPort();
+ assertEquals(tservers.contains(hostAndPort), true);
+ }
+ }
+ }
+
+ /**
* Regression test for KUDU-3205.
*/
@Test