You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by la...@apache.org on 2022/05/09 02:49:10 UTC

[kudu] branch master updated: [java] Fix a scan bug which will read repetitive rows.

This is an automated email from the ASF dual-hosted git repository.

laiyingchun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new becef6270 [java] Fix a scan bug which will read repetitive rows.
becef6270 is described below

commit becef62700744e961d8450c39776f6d4a43ec145
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Fri Apr 15 23:41:40 2022 +0800

    [java] Fix a scan bug which will read repetitive rows.
    
    When isFaultTolerant is true, from 2nd ScanRequest, in its response
    callback function, lastPrimaryKey is not updated.
    
    In common scenarios, when tablet server hosting the leader replica
    restarts, Scanners will read rows from the first
    ScanResponse's lastPrimaryKey, that will return some repetitive rows.
    
    Change-Id: I4d6be9df10c1a45cd971b52a0351028c1f5a023f
    Reviewed-on: http://gerrit.cloudera.org:8080/18420
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 .../org/apache/kudu/client/AsyncKuduScanner.java   |   8 ++
 .../kudu/client/RowErrorsAndOverflowStatus.java    |  15 +++
 .../apache/kudu/client/ITFaultTolerantScanner.java |  14 ++-
 .../apache/kudu/client/ITScannerMultiTablet.java   | 126 ++++++++++++++++++++-
 4 files changed, 160 insertions(+), 3 deletions(-)

diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 79e18f8b5..77e39b5be 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -690,6 +690,9 @@ public final class AsyncKuduScanner {
         @Override
         public RowResultIterator call(final Response resp) {
           numRowsReturned += resp.data.getNumRows();
+          if (isFaultTolerant && resp.lastPrimaryKey != null) {
+            lastPrimaryKey = resp.lastPrimaryKey;
+          }
           if (!resp.more) {  // We're done scanning this tablet.
             scanFinished();
             return resp.data;
@@ -818,6 +821,11 @@ public final class AsyncKuduScanner {
     } else {
       buf.append(", endPrimaryKey=<end>");
     }
+    if (lastPrimaryKey.length > 0) {
+      buf.append(", lastPrimaryKey=").append(Bytes.hex(lastPrimaryKey));
+    } else {
+      buf.append(", lastPrimaryKey=<last>");
+    }
     buf.append(')');
     return buf.toString();
   }
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/RowErrorsAndOverflowStatus.java b/java/kudu-client/src/main/java/org/apache/kudu/client/RowErrorsAndOverflowStatus.java
index 0a618f80b..31cae5bc7 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/RowErrorsAndOverflowStatus.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/RowErrorsAndOverflowStatus.java
@@ -49,4 +49,19 @@ public class RowErrorsAndOverflowStatus {
   public boolean isOverflowed() {
     return overflowed;
   }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("rowErrors size: ").append(rowErrors.length);
+    sb.append(", rowErrors: [");
+    if (rowErrors.length > 0) {
+      sb.append(rowErrors[0].toString());
+      for (int i = 1; i < rowErrors.length; i++) {
+        sb.append(", ").append(rowErrors[i].toString());
+      }
+    }
+    sb.append("], overflowed: ").append(overflowed);
+    return sb.toString();
+  }
 }
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
index bb9791be3..089cfd304 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITFaultTolerantScanner.java
@@ -38,10 +38,22 @@ public class ITFaultTolerantScanner extends ITScannerMultiTablet {
    * of tablet scanning and verifies the scan results are as expected.
    */
   @Test(timeout = 100000)
-  public void testFaultTolerantScannerRestart() throws Exception {
+  public void testFaultTolerantScannerRestartFirstScanRequest() throws Exception {
     serverFaultInjection(true, true, false);
   }
 
+  /**
+   * Tests fault tolerant scanner by restarting the tserver in the middle
+   * of tablet scanning and verifies the scan results are as expected.
+   * Notice, the fault injection happens at the 2nd ScanRequest or next scan
+   * request rather than the first scan request.
+   * @throws Exception
+   */
+  @Test(timeout = 100000)
+  public void testFaultTolerantScannerRestartAfterSecondScanRequest() throws Exception {
+    serverFaultInjectionRestartAfterSecondScanRequest();
+  }
+
   /**
    * Tests fault tolerant scanner by killing the tablet server in the middle
    * of tablet scanning and verifies the scan results are as expected.
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index c3c2d2d01..16a948693 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -21,11 +21,23 @@ import static org.apache.kudu.test.ClientTestUtil.getBasicSchema;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
 
 import com.google.common.collect.Lists;
 import org.junit.Before;
 import org.junit.Rule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.kudu.Schema;
 import org.apache.kudu.test.KuduTestHarness;
@@ -36,6 +48,7 @@ import org.apache.kudu.test.KuduTestHarness;
  */
 public class ITScannerMultiTablet {
 
+  private static final Logger LOG = LoggerFactory.getLogger(ITScannerMultiTablet.class);
   private static final String TABLE_NAME =
       ITScannerMultiTablet.class.getName() + "-" + System.currentTimeMillis();
   protected static final int ROW_COUNT = 20000;
@@ -58,16 +71,21 @@ public class ITScannerMultiTablet {
         TABLET_COUNT);
 
     table = harness.getClient().createTable(TABLE_NAME, schema, builder);
-
     KuduSession session = harness.getClient().newSession();
     session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND);
 
+    Set<Integer> primaryKeys = new HashSet<Integer>();
     // Getting meaty rows.
     char[] chars = new char[1024];
     for (int i = 0; i < ROW_COUNT; i++) {
       Insert insert = table.newInsert();
       PartialRow row = insert.getRow();
-      row.addInt(0, random.nextInt());
+      int id = random.nextInt();
+      while (id == Integer.MIN_VALUE || primaryKeys.contains(id)) {
+        id = random.nextInt();
+      }
+      row.addInt(0, id);
+      primaryKeys.add(id);
       row.addInt(1, i);
       row.addInt(2, i);
       row.addString(3, new String(chars));
@@ -75,6 +93,11 @@ public class ITScannerMultiTablet {
       session.apply(insert);
     }
     session.flush();
+    session.close();
+    // Log on error details.
+    if (session.countPendingErrors() > 0) {
+      LOG.info("RowErrorsAndOverflowStatus: {}", session.getPendingErrors().toString());
+    }
     assertEquals(0, session.countPendingErrors());
   }
 
@@ -153,6 +176,105 @@ public class ITScannerMultiTablet {
     }
   }
 
+  /**
+   * Inject failures (kill or restart TabletServer) while scanning,
+   * Inject failure (restart TabletServer) to a tablet's leader while
+   * scanning after second scan request, to verify:
+   * a fault tolerant scanner will continue scanning and a non-fault tolerant scanner will throw
+   * {@link NonRecoverableException}.
+   *
+   * @throws Exception
+   */
+  void serverFaultInjectionRestartAfterSecondScanRequest() throws Exception {
+    // In fact, the test has TABLET_COUNT, default is 3.
+    // We check the rows' order, no dup rows and loss rows.
+    // And In the case, we need 2 times or more scan requests,
+    // so set a minimum batchSizeBytes 1.
+    KuduScanToken.KuduScanTokenBuilder tokenBuilder = harness.getClient().newScanTokenBuilder(table)
+        .batchSizeBytes(1)
+        .setFaultTolerant(true)
+        .setProjectedColumnIndexes(Lists.newArrayList(0));
+
+    List<KuduScanToken> tokens = tokenBuilder.build();
+    assertTrue(tokens.size() == TABLET_COUNT);
+
+    class TabletScannerTask implements Callable<Integer> {
+      private KuduScanToken token;
+      private boolean enableFaultInjection;
+
+      public TabletScannerTask(KuduScanToken token, boolean enableFaultInjection) {
+        this.token = token;
+        this.enableFaultInjection = enableFaultInjection;
+      }
+
+      @Override
+      public Integer call() {
+        int rowCount = 0;
+        KuduScanner scanner;
+        try {
+          scanner = this.token.intoScanner(harness.getClient());
+        } catch (IOException e) {
+          LOG.error("Generate KuduScanner error, {}", e.getMessage());
+          e.printStackTrace();
+          return -1;
+        }
+        try {
+          int previousRow = Integer.MIN_VALUE;
+          boolean faultInjected = !this.enableFaultInjection;
+          int faultInjectionLowBound = (ROW_COUNT / TABLET_COUNT / 2);
+          while (scanner.hasMoreRows()) {
+            RowResultIterator rri = scanner.nextRows();
+            while (rri.hasNext()) {
+              int key = rri.next().getInt(0);
+              if (previousRow >= key) {
+                LOG.error("Impossible results, previousKey: {} >= currentKey: {}",
+                          previousRow, key);
+                return -1;
+              }
+              if (!faultInjected && rowCount > faultInjectionLowBound) {
+                harness.restartTabletServer(scanner.currentTablet());
+                faultInjected = true;
+              }
+              previousRow = key;
+              rowCount++;
+            }
+          }
+        } catch (Exception e) {
+          LOG.error("Scan error, {}", e.getMessage());
+          e.printStackTrace();
+        } finally {
+          try {
+            scanner.close();
+          } catch (KuduException e) {
+            LOG.warn(e.getMessage());
+            e.printStackTrace();
+          }
+        }
+        return rowCount;
+      }
+    }
+
+    int rowCount = 0;
+    ExecutorService threadPool = Executors.newFixedThreadPool(TABLET_COUNT);
+    List<TabletScannerTask> tabletScannerTasks = new ArrayList<>();
+    tabletScannerTasks.add(new TabletScannerTask(tokens.get(0), true));
+    for (int i = 1; i < tokens.size(); i++) {
+      tabletScannerTasks.add(new TabletScannerTask(tokens.get(i), false));
+    }
+    List<Future<Integer>> results = threadPool.invokeAll(tabletScannerTasks);
+    threadPool.shutdown();
+    assertTrue(threadPool.awaitTermination(100, TimeUnit.SECONDS));
+    for (Future<Integer> result : results) {
+      try {
+        rowCount += result.get();
+      } catch (Exception e) {
+        LOG.info(e.getMessage());
+        assertTrue(false);
+      }
+    }
+    assertEquals(ROW_COUNT, rowCount);
+  }
+
   /**
    * Injecting failures (i.e. drop client connection) while scanning, to verify:
    * both non-fault tolerant scanner and fault tolerant scanner will continue scan as expected.