You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/09/22 00:33:43 UTC
[1/3] kudu git commit: [java client] Improve and hide
OperationResponse#getWriteTimestamp
Repository: kudu
Updated Branches:
refs/heads/master 80923d690 -> 0ce5ba594
[java client] Improve and hide OperationResponse#getWriteTimestamp
That method was returning a HT-encoded ts, not a ts in microseconds. It's
also not meant for public consumption just yet, the same way
AbstractKuduScannerBuilder#snapshotTimestampRaw is.
Change-Id: I8cfc6fcc1d0607a94bb8be9e5a0d53a4987920af
Reviewed-on: http://gerrit.cloudera.org:8080/4487
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/519b2227
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/519b2227
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/519b2227
Branch: refs/heads/master
Commit: 519b2227d91936f10412a4fa509e4defe503d53c
Parents: 80923d6
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 16:55:28 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:14 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/AbstractKuduScannerBuilder.java | 2 +-
.../main/java/org/apache/kudu/client/OperationResponse.java | 6 +++---
.../src/test/java/org/apache/kudu/client/TestHybridTime.java | 8 ++++----
3 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
index 54753dd..ae7b5d4 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AbstractKuduScannerBuilder.java
@@ -220,7 +220,7 @@ public abstract class AbstractKuduScannerBuilder
* Sets a previously encoded HT timestamp as a snapshot timestamp, for tests. None is used by
* default.
* Requires that the ReadMode is READ_AT_SNAPSHOT.
- * @param htTimestamp a long representing a HybridClock-encoded timestamp
+ * @param htTimestamp a long representing a HybridTime-encoded timestamp
* @return this instance
* @throws IllegalArgumentException on build(), if the timestamp is less than 0 or if the
* read mode was not set to READ_AT_SNAPSHOT
http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
index 794e341..93d1a63 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/OperationResponse.java
@@ -78,10 +78,10 @@ public class OperationResponse extends KuduRpcResponse {
/**
* Gives the write timestamp that was returned by the Tablet Server.
- * @return a timestamp in milliseconds, 0 if the external consistency mode set
- * in AsyncKuduSession wasn't CLIENT_PROPAGATED, or if the operation failed.
+ * @return a long representing a HybridTime-encoded timestamp
*/
- public long getWriteTimestamp() {
+ @InterfaceAudience.Private
+ public long getWriteTimestampRaw() {
return writeTimestamp;
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/519b2227/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
index a74594e..fdf18ba 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestHybridTime.java
@@ -93,8 +93,8 @@ public class TestHybridTime extends BaseKuduTest {
row.addString(schema.getColumnByIndex(0).getName(), keys[i]);
Deferred<OperationResponse> d = session.apply(insert);
OperationResponse response = d.join(DEFAULT_SLEEP);
- assertTrue(response.getWriteTimestamp() != 0);
- clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestamp());
+ assertTrue(response.getWriteTimestampRaw() != 0);
+ clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
+ " Logical value: " + clockValues[1]);
// on the very first write we update the clock into the future
@@ -128,8 +128,8 @@ public class TestHybridTime extends BaseKuduTest {
1, responses.size());
OperationResponse response = responses.get(0);
- assertTrue(response.getWriteTimestamp() != 0);
- clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestamp());
+ assertTrue(response.getWriteTimestampRaw() != 0);
+ clockValues = HTTimestampToPhysicalAndLogical(response.getWriteTimestampRaw());
LOG.debug("Clock value after write[" + i + "]: " + new Date(clockValues[0] / 1000).toString()
+ " Logical value: " + clockValues[1]);
assertEquals(clockValues[0], previousPhysicalValue);
[2/3] kudu git commit: [java client] Fix an NPE in KuduException
Posted by jd...@apache.org.
[java client] Fix an NPE in KuduException
Saw this in a Jenkins run and also running ITClient on my machine.
Change-Id: Iceddc6931e8d3a8cb807657fc5c0804f7052e48f
Reviewed-on: http://gerrit.cloudera.org:8080/4488
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Kudu Jenkins
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/8fc75a5c
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/8fc75a5c
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/8fc75a5c
Branch: refs/heads/master
Commit: 8fc75a5c654e100871316e61878b141df4707d0e
Parents: 519b222
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 18:02:36 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:20 2016 +0000
----------------------------------------------------------------------
.../src/main/java/org/apache/kudu/client/KuduException.java | 9 ++++++---
1 file changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/8fc75a5c/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
index 16b1072..f3afe92 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/KuduException.java
@@ -80,20 +80,23 @@ public abstract class KuduException extends IOException {
* @return a KuduException that's easier to handle
*/
static KuduException transformException(Exception e) {
+ // The message may be null.
+ String message = e.getMessage() == null ? "" : e.getMessage();
if (e instanceof KuduException) {
return (KuduException) e;
} else if (e instanceof DeferredGroupException) {
// TODO anything we can do to improve on that kind of exception?
} else if (e instanceof TimeoutException) {
- Status statusTimeout = Status.TimedOut(e.getMessage());
+ Status statusTimeout = Status.TimedOut(message);
return new NonRecoverableException(statusTimeout, e);
} else if (e instanceof InterruptedException) {
// Need to reset the interrupt flag since we caught it but aren't handling it.
Thread.currentThread().interrupt();
- Status statusAborted = Status.Aborted(e.getMessage());
+
+ Status statusAborted = Status.Aborted(message);
return new NonRecoverableException(statusAborted, e);
}
- Status status = Status.IOError(e.getMessage());
+ Status status = Status.IOError(message);
return new NonRecoverableException(status, e);
}
}
[3/3] kudu git commit: [java client] Few ITClient improvements
Posted by jd...@apache.org.
[java client] Few ITClient improvements
ITClient has been flaky for a while now, mostly due to the "Row count regressed"
issue. I fixed it by using snapshot timestamps, which made me refactor how we build
scanners, which made me add a new counting method in BaseKuduTest.
I continued running the test and saw other issues. Some unchecked errors were not
killing the test, so I added an UncaughtExceptionHandler. I also saw invalid
scanner sequence ID errors that are normal due to how this test runs that were killing
the test. Finally, I converted some plain Exceptions into KuduExceptions which gave
us access to their Status.
Change-Id: I3b5ddca26b66e9fc1f737aaacf98df340f0b9024
Reviewed-on: http://gerrit.cloudera.org:8080/4489
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/0ce5ba59
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/0ce5ba59
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/0ce5ba59
Branch: refs/heads/master
Commit: 0ce5ba594412de4365625485ea7b3c1ee21bf28d
Parents: 8fc75a5
Author: Jean-Daniel Cryans <jd...@apache.org>
Authored: Tue Sep 20 18:20:01 2016 -0700
Committer: Jean-Daniel Cryans <jd...@apache.org>
Committed: Thu Sep 22 00:33:27 2016 +0000
----------------------------------------------------------------------
.../org/apache/kudu/client/BaseKuduTest.java | 19 +++--
.../java/org/apache/kudu/client/ITClient.java | 75 +++++++++++++++-----
2 files changed, 68 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
index 41015a4..8989f50 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/BaseKuduTest.java
@@ -140,13 +140,17 @@ public class BaseKuduTest {
data.addCallbacks(cb, defaultErrorCB);
data.join(DEFAULT_SLEEP);
}
-
- Deferred<RowResultIterator> closer = scanner.close();
- closer.addCallbacks(cb, defaultErrorCB);
- closer.join(DEFAULT_SLEEP);
return counter.get();
}
+ protected static int countRowsInScan(KuduScanner scanner) throws KuduException {
+ int counter = 0;
+ while (scanner.hasMoreRows()) {
+ counter += scanner.nextRows().getNumRows();
+ }
+ return counter;
+ }
+
/**
* Scans the table and returns the number of rows.
* @param table the table
@@ -155,17 +159,12 @@ public class BaseKuduTest {
*/
protected long countRowsInTable(KuduTable table, KuduPredicate... predicates)
throws KuduException {
- long count = 0;
KuduScanner.KuduScannerBuilder scanBuilder = syncClient.newScannerBuilder(table);
for (KuduPredicate predicate : predicates) {
scanBuilder.addPredicate(predicate);
}
scanBuilder.setProjectedColumnIndexes(ImmutableList.<Integer>of());
- KuduScanner scanner = scanBuilder.build();
- while (scanner.hasMoreRows()) {
- count += scanner.nextRows().getNumRows();
- }
- return count;
+ return countRowsInScan(scanBuilder.build());
}
protected List<String> scanTableToStrings(KuduTable table,
http://git-wip-us.apache.org/repos/asf/kudu/blob/0ce5ba59/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
----------------------------------------------------------------------
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
index 10cf231..c482a86 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITClient.java
@@ -60,6 +60,8 @@ public class ITClient extends BaseKuduTest {
private static KuduTable table;
private static long runtimeInSeconds;
+ private volatile long sharedWriteTimestamp;
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -89,6 +91,9 @@ public class ITClient extends BaseKuduTest {
@Test(timeout = TEST_TIMEOUT_SECONDS)
public void test() throws Exception {
+
+ UncaughtExceptionHandler uncaughtExceptionHandler = new UncaughtExceptionHandler();
+
ArrayList<Thread> threads = new ArrayList<>();
Thread chaosThread = new Thread(new ChaosThread());
Thread writerThread = new Thread(new WriterThread());
@@ -99,6 +104,7 @@ public class ITClient extends BaseKuduTest {
threads.add(scannerThread);
for (Thread thread : threads) {
+ thread.setUncaughtExceptionHandler(uncaughtExceptionHandler);
thread.start();
}
@@ -235,6 +241,7 @@ public class ITClient extends BaseKuduTest {
@Override
public void run() {
+ session.setExternalConsistencyMode(ExternalConsistencyMode.CLIENT_PROPAGATED);
while (KEEP_RUNNING_LATCH.getCount() > 0) {
try {
OperationResponse resp = session.apply(createBasicSchemaInsert(table, currentRowKey));
@@ -279,6 +286,13 @@ public class ITClient extends BaseKuduTest {
" returned this error: " + resp.getRowError(), null);
return true;
}
+
+ if (resp == null) {
+ return false;
+ }
+
+ sharedWriteTimestamp = resp.getWriteTimestampRaw();
+
return false;
}
}
@@ -299,8 +313,11 @@ public class ITClient extends BaseKuduTest {
boolean shouldContinue;
- // Always scan until we find rows.
- if (lastRowCount == 0 || random.nextBoolean()) {
+ // First check if we've written at least one row.
+ if (sharedWriteTimestamp == 0) {
+ shouldContinue = true;
+ } else if (lastRowCount == 0 || // Need to full scan once before random reading
+ random.nextBoolean()) {
shouldContinue = fullScan();
} else {
shouldContinue = randomGet();
@@ -322,14 +339,16 @@ public class ITClient extends BaseKuduTest {
}
/**
- * Reads a row at random that it knows to exist (smaller than lastRowCount).
- * @return
+ * Reads a row at random that should exist (smaller than lastRowCount).
+ * @return true if the get was successful, false if there was an error
*/
private boolean randomGet() {
int key = random.nextInt(lastRowCount);
KuduPredicate predicate = KuduPredicate.newComparisonPredicate(
table.getSchema().getColumnByIndex(0), KuduPredicate.ComparisonOp.EQUAL, key);
- KuduScanner scanner = localClient.newScannerBuilder(table).addPredicate(predicate).build();
+ KuduScanner scanner = getScannerBuilder()
+ .addPredicate(predicate)
+ .build();
List<RowResult> results = new ArrayList<>();
while (scanner.hasMoreRows()) {
@@ -338,7 +357,7 @@ public class ITClient extends BaseKuduTest {
for (RowResult row : ite) {
results.add(row);
}
- } catch (Exception e) {
+ } catch (KuduException e) {
return checkAndReportError("Got error while getting row " + key, e);
}
}
@@ -357,25 +376,33 @@ public class ITClient extends BaseKuduTest {
}
/**
- * Rusn a full table scan and updates the lastRowCount.
- * @return
+ * Runs a full table scan and updates the lastRowCount.
+ * @return true if the full scan was successful, false if there was an error
*/
private boolean fullScan() {
- AsyncKuduScanner scannerBuilder = localAsyncClient.newScannerBuilder(table).build();
+ KuduScanner scanner = getScannerBuilder().build();
try {
- int rowCount = countRowsInScan(scannerBuilder);
+ int rowCount = countRowsInScan(scanner);
if (rowCount < lastRowCount) {
reportError("Row count regressed: " + rowCount + " < " + lastRowCount, null);
return false;
}
- lastRowCount = rowCount;
- LOG.info("New row count {}", lastRowCount);
- } catch (Exception e) {
- checkAndReportError("Got error while row counting", e);
+ if (rowCount > lastRowCount) {
+ lastRowCount = rowCount;
+ LOG.info("New row count {}", lastRowCount);
+ }
+ } catch (KuduException e) {
+ return checkAndReportError("Got error while row counting", e);
}
return true;
}
+ private KuduScanner.KuduScannerBuilder getScannerBuilder() {
+ return localClient.newScannerBuilder(table)
+ .readMode(AsyncKuduScanner.ReadMode.READ_AT_SNAPSHOT)
+ .snapshotTimestampRaw(sharedWriteTimestamp);
+ }
+
/**
* Checks the passed exception contains "Scanner not found". If it does then it returns true,
* else it reports the error and returns false.
@@ -384,12 +411,28 @@ public class ITClient extends BaseKuduTest {
* @param e the exception to check
* @return true if the scanner failed because it wasn't false, otherwise false
*/
- private boolean checkAndReportError(String message, Exception e) {
- if (!e.getCause().getMessage().contains("Scanner not found")) {
+ private boolean checkAndReportError(String message, KuduException e) {
+ // Do nasty things, expect nasty results. The scanners are a bit too happy to retry TS
+ // disconnections so we might end up retrying a scanner on a node that restarted, or we might
+ // get disconnected just after sending an RPC so when we reconnect to the same TS we might get
+ // the "Invalid call sequence ID" message.
+ if (!e.getStatus().isNotFound() &&
+ !e.getStatus().getMessage().contains("Invalid call sequence ID")) {
reportError(message, e);
return false;
}
return true;
}
}
+
+ class UncaughtExceptionHandler implements Thread.UncaughtExceptionHandler {
+
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ // Only report an error if we're still running, else we'll spam the log.
+ if (KEEP_RUNNING_LATCH.getCount() != 0) {
+ reportError("Uncaught exception", new Exception(e));
+ }
+ }
+ }
}
\ No newline at end of file