You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/09/22 00:33:43 UTC

[1/3] kudu git commit: [java client] Improve and hide OperationResponse#getWriteTimestamp

Repository: kudu
Updated Branches:
  refs/heads/master 80923d690 -> 0ce5ba594


[java client] Improve and hide OperationResponse#getWriteTimestamp

That method was returning a HT-encoded ts, not a ts in microseconds. It's
also not meant for public consumption just yet, the same way
AbstractKuduScannerBuilder#snapshotTimestampRaw is.

Change-Id: I8cfc6fcc1d0607a94bb8be9e5a0d53a4987920af
Reviewed-on: http://gerrit.cloudera.org:8080/4487
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/519b2227
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/519b2227
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/519b2227

Branch: refs/heads/master
Commit: 519b2227d91936f10412a4fa509e4defe503d53c
Parents: 80923d6
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 16:55:28 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:14 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/AbstractKuduScannerBuilder.java   | 2 +-
 .../main/java/org/apache/kudu/client/OperationResponse.java  | 6 +++---
 .../src/test/java/org/apache/kudu/client/TestHybridTime.java | 8 ++++----
 3 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 54753dd..ae7b5d4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -220,7 +220,7 @@ public abstract class AbstractKuduScannerBuilder
    * Sets a previously encoded HT timestamp as a snapshot timestamp, for tests. None is used by
    * default.
    * Requires that the ReadMode is READ_AT_SNAPSHOT.
-   * @param htTimestamp a long representing a HybridClock-encoded timestamp
+   * @param htTimestamp a long representing a HybridTime-encoded timestamp
    * @return this instance
    * @throws IllegalArgumentException on build(), if the timestamp is less than 0 or if the
    *                                  read mode was not set to READ_AT_SNAPSHOT

http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index 794e341..93d1a63 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -78,10 +78,10 @@ public class OperationResponse extends KuduRpcResponse {
 
   /**
    * Gives the write timestamp that was returned by the Tablet Server.
-   * @return a timestamp in milliseconds, 0 if the external consistency mode set
-   *         in AsyncKuduSession wasn't CLIENT_PROPAGATED, or if the operation failed.
+   * @return a long representing a HybridTime-encoded timestamp
    */
-  public long getWriteTimestamp() {
+  @InterfaceAudience.Private
+  public long getWriteTimestampRaw() {
     return writeTimestamp;
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
index a74594e..fdf18ba 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
@@ -93,8 +93,8 @@ public class TestHybridTime extends BaseKuduTest {
       row.addString(schema.getColumnByIndex(0).getName(), keys[i]);
       Deferred<OperationResponse> d = session.apply(insert);
       OperationResponse response = d.join(DEFAULT_SLEEP);
-      assertTrue(response.getWriteTimestamp() != 0);
-      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestamp());
+      assertTrue(response.getWriteTimestampRaw() != 0);
+      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
       LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
         + " Logical value: " + clockValues[1]);
       // on the very first write we update the clock into the future
@@ -128,8 +128,8 @@ public class TestHybridTime extends BaseKuduTest {
         1, responses.size());
 
       OperationResponse response = responses.get(0);
-      assertTrue(response.getWriteTimestamp() != 0);
-      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestamp());
+      assertTrue(response.getWriteTimestampRaw() != 0);
+      clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
       LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
         + " Logical value: " + clockValues[1]);
       assertEquals(clockValues[0], previousPhysicalValue);


[2/3] kudu git commit: [java client] Fix an NPE in KuduException

Posted by jd...@apache.org.
[java client] Fix an NPE in KuduException

Saw this in a Jenkins run and also running ITClient on my machine.

Change-Id: Iceddc6931e8d3a8cb807657fc5c0804f7052e48f
Reviewed-on: http://gerrit.cloudera.org:8080/4488
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8fc75a5c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8fc75a5c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8fc75a5c

Branch: refs/heads/master
Commit: 8fc75a5c654e100871316e61878b141df4707d0e
Parents: 519b222
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 18:02:36 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:20 2016 +0000

----------------------------------------------------------------------
 .../src/main/java/org/apache/kudu/client/KuduException.java | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/8fc75a5c/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
index 16b1072..f3afe92 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
@@ -80,20 +80,23 @@ public abstract class KuduException extends IOException {
    * @return a KuduException that's easier to handle
    */
   static KuduException transformException(Exception e) {
+    // The message may be null.
+    String message =  e.getMessage() == null ? "" : e.getMessage();
     if (e instanceof KuduException) {
       return (KuduException) e;
     } else if (e instanceof DeferredGroupException) {
       // TODO anything we can do to improve on that kind of exception?
     } else if (e instanceof TimeoutException) {
-      Status statusTimeout = Status.TimedOut(e.getMessage());
+      Status statusTimeout = Status.TimedOut(message);
       return new NonRecoverableException(statusTimeout, e);
     } else if (e instanceof InterruptedException) {
       // Need to reset the interrupt flag since we caught it but aren't handling it.
       Thread.currentThread().interrupt();
-      Status statusAborted = Status.Aborted(e.getMessage());
+
+      Status statusAborted = Status.Aborted(message);
       return new NonRecoverableException(statusAborted, e);
     }
-    Status status = Status.IOError(e.getMessage());
+    Status status = Status.IOError(message);
     return new NonRecoverableException(status, e);
   }
 }


[3/3] kudu git commit: [java client] Few ITClient improvements

Posted by jd...@apache.org.
[java client] Few ITClient improvements

ITClient has been flaky for a while now, mostly due to the "Row count regressed"
issue. I fixed it by using snapshot timestamps, which made me refactor how we build
scanners, which made me add a new counting method in BaseKuduTest.

I continued running the test and saw other issues. Some unchecked errors were not
killing the test, so I added an UncaughtExceptionHandler. I also saw invalid
scanner sequence ID errors that are normal due to how this test runs that were killing
the test. Finally, I converted some plain Exceptions into KuduExceptions which gave
us access to their Status.

Change-Id: I3b5ddca26b66e9fc1f737aaacf98df340f0b9024
Reviewed-on: http://gerrit.cloudera.org:8080/4489
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0ce5ba59
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0ce5ba59
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0ce5ba59

Branch: refs/heads/master
Commit: 0ce5ba594412de4365625485ea7b3c1ee21bf28d
Parents: 8fc75a5
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 18:20:01 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:27 2016 +0000

----------------------------------------------------------------------
 .../org/apache/kudu/client/BaseKuduTest.java    | 19 +++--
 .../java/org/apache/kudu/client/ITClient.java   | 75 +++++++++++++++-----
 2 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 41015a4..8989f50 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -140,13 +140,17 @@ public class BaseKuduTest {
       data.addCallbacks(cb, defaultErrorCB);
       data.join(DEFAULT_SLEEP);
     }
-
-    Deferred<RowResultIterator> closer = scanner.close();
-    closer.addCallbacks(cb, defaultErrorCB);
-    closer.join(DEFAULT_SLEEP);
     return counter.get();
   }
 
+  protected static int countRowsInScan(KuduScanner scanner) throws KuduException {
+    int counter = 0;
+    while (scanner.hasMoreRows()) {
+      counter += scanner.nextRows().getNumRows();
+    }
+    return counter;
+  }
+
   /**
    * Scans the table and returns the number of rows.
    * @param table the table
@@ -155,17 +159,12 @@ public class BaseKuduTest {
    */
   protected long countRowsInTable(KuduTable table, KuduPredicate... predicates)
       throws KuduException {
-    long count = 0;
     KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
     for (KuduPredicate predicate : predicates) {
       scanBuilder.addPredicate(predicate);
     }
     scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
-    KuduScanner scanner = scanBuilder.build();
-    while (scanner.hasMoreRows()) {
-      count += scanner.nextRows().getNumRows();
-    }
-    return count;
+    return countRowsInScan(scanBuilder.build());
   }
 
   protected List<String> scanTableToStrings(KuduTable table,

http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 10cf231..c482a86 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -60,6 +60,8 @@ public class ITClient extends BaseKuduTest {
   private static KuduTable table;
   private static long runtimeInSeconds;
 
+  private volatile long sharedWriteTimestamp;
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
 
@@ -89,6 +91,9 @@ public class ITClient extends BaseKuduTest {
 
   @Test(timeout = TEST_TIMEOUT_SECONDS)
   public void test() throws Exception {
+
+    UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler();
+
     ArrayList<Thread> threads = new ArrayList<>();
     Thread chaosThread = new Thread(new ChaosThread());
     Thread writerThread = new Thread(new WriterThread());
@@ -99,6 +104,7 @@ public class ITClient extends BaseKuduTest {
     threads.add(scannerThread);
 
     for (Thread thread : threads) {
+      thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
       thread.start();
     }
 
@@ -235,6 +241,7 @@ public class ITClient extends BaseKuduTest {
 
     @Override
     public void run() {
+      session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED);
       while (KEEP_RUNNING_LATCH.getCount() > 0) {
         try {
           OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
@@ -279,6 +286,13 @@ public class ITClient extends BaseKuduTest {
             " returned this error: " + resp.getRowError(), null);
         return true;
       }
+
+      if (resp == null) {
+        return false;
+      }
+
+      sharedWriteTimestamp = resp.getWriteTimestampRaw();
+
       return false;
     }
   }
@@ -299,8 +313,11 @@ public class ITClient extends BaseKuduTest {
 
         boolean shouldContinue;
 
-        // Always scan until we find rows.
-        if (lastRowCount == 0 || random.nextBoolean()) {
+        // First check if we've written at least one row.
+        if (sharedWriteTimestamp == 0) {
+          shouldContinue = true;
+        } else if (lastRowCount == 0 || // Need to full scan once before random reading
+            random.nextBoolean()) {
           shouldContinue = fullScan();
         } else {
           shouldContinue = randomGet();
@@ -322,14 +339,16 @@ public class ITClient extends BaseKuduTest {
     }
 
     /**
-     * Reads a row at random that it knows to exist (smaller than lastRowCount).
-     * @return
+     * Reads a row at random that should exist (smaller than lastRowCount).
+     * @return true if the get was successful, false if there was an error
      */
     private boolean randomGet() {
       int key = random.nextInt(lastRowCount);
       KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
           table.getSchema().getColumnByIndex(0), KuduPredicate.ComparisonOp.EQUAL, key);
-      KuduScanner scanner = localClient.newScannerBuilder(table).addPredicate(predicate).build();
+      KuduScanner scanner = getScannerBuilder()
+          .addPredicate(predicate)
+          .build();
 
       List<RowResult> results = new ArrayList<>();
       while (scanner.hasMoreRows()) {
@@ -338,7 +357,7 @@ public class ITClient extends BaseKuduTest {
           for (RowResult row : ite) {
             results.add(row);
           }
-        } catch (Exception e) {
+        } catch (KuduException e) {
           return checkAndReportError("Got error while getting row " + key, e);
         }
       }
@@ -357,25 +376,33 @@ public class ITClient extends BaseKuduTest {
     }
 
     /**
-     * Rusn a full table scan and updates the lastRowCount.
-     * @return
+     * Runs a full table scan and updates the lastRowCount.
+     * @return true if the full scan was successful, false if there was an error
      */
     private boolean fullScan() {
-      AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+      KuduScanner scanner = getScannerBuilder().build();
       try {
-        int rowCount = countRowsInScan(scannerBuilder);
+        int rowCount = countRowsInScan(scanner);
         if (rowCount < lastRowCount) {
           reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null);
           return false;
         }
-        lastRowCount = rowCount;
-        LOG.info("New row count {}", lastRowCount);
-      } catch (Exception e) {
-        checkAndReportError("Got error while row counting", e);
+        if (rowCount > lastRowCount) {
+          lastRowCount = rowCount;
+          LOG.info("New row count {}", lastRowCount);
+        }
+      } catch (KuduException e) {
+        return checkAndReportError("Got error while row counting", e);
       }
       return true;
     }
 
+    private KuduScanner.KuduScannerBuilder getScannerBuilder() {
+      return localClient.newScannerBuilder(table)
+          .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+          .snapshotTimestampRaw(sharedWriteTimestamp);
+    }
+
     /**
      * Checks the passed exception contains "Scanner not found". If it does then it returns true,
      * else it reports the error and returns false.
@@ -384,12 +411,28 @@ public class ITClient extends BaseKuduTest {
      * @param e the exception to check
      * @return true if the scanner failed because it wasn't false, otherwise false
      */
-    private boolean checkAndReportError(String message, Exception e) {
-      if (!e.getCause().getMessage().contains("Scanner not found")) {
+    private boolean checkAndReportError(String message, KuduException e) {
+      // Do nasty things, expect nasty results. The scanners are a bit too happy to retry TS
+      // disconnections so we might end up retrying a scanner on a node that restarted, or we might
+      // get disconnected just after sending an RPC so when we reconnect to the same TS we might get
+      // the "Invalid call sequence ID" message.
+      if (!e.getStatus().isNotFound() &&
+          !e.getStatus().getMessage().contains("Invalid call sequence ID")) {
         reportError(message, e);
         return false;
       }
       return true;
     }
   }
+
+  class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+    @Override
+    public void uncaughtException(Thread t, Throwable e) {
+      // Only report an error if we're still running, else we'll spam the log.
+      if (KEEP_RUNNING_LATCH.getCount() != 0) {
+        reportError("Uncaught exception", new Exception(e));
+      }
+    }
+  }
 }
\ No newline at end of file