You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by zh...@apache.org on 2022/05/19 10:34:27 UTC
[kudu] branch master updated: [java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow callback
This is an automated email from the ASF dual-hosted git repository.
zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new 17fd93255 [java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow callback
17fd93255 is described below
commit 17fd93255e124191c840702dab3eed961aadee7f
Author: shenxingwuying <sh...@gmail.com>
AuthorDate: Wed Apr 27 23:32:57 2022 +0800
[java] update lastPropagatedTimestamp and resourceMetrics in gotNextRow callback
As Andrew Wong said in https://gerrit.cloudera.org/18420,
Client should update lastPropagatedTimestamp and resourceMetrics
when client received the second or later scan response from tserver.
Change-Id: I26308754ca741276204cc95cee4f8e4a91dbf331
Reviewed-on: http://gerrit.cloudera.org:8080/18453
Tested-by: Kudu Jenkins
Reviewed-by: Yifan Zhang <ch...@163.com>
---
.../org/apache/kudu/client/AsyncKuduScanner.java | 21 +++++++++++++++++++++
.../apache/kudu/client/ITScannerMultiTablet.java | 17 +++++++++++++++++
2 files changed, 38 insertions(+)
diff --git a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
index 77e39b5be..49c188901 100644
--- a/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
+++ b/java/kudu-client/src/main/java/org/apache/kudu/client/AsyncKuduScanner.java
@@ -689,10 +689,31 @@ public final class AsyncKuduScanner {
new Callback<RowResultIterator, Response>() {
@Override
public RowResultIterator call(final Response resp) {
+ long lastPropagatedTimestamp = AsyncKuduClient.NO_TIMESTAMP;
+ if (readMode == ReadMode.READ_YOUR_WRITES &&
+ resp.scanTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ // For READ_YOUR_WRITES mode, update the latest propagated timestamp
+ // with the chosen snapshot timestamp sent back from the server, to
+ // avoid unnecessarily wait for subsequent reads. Since as long as
+ // the chosen snapshot timestamp of the next read is greater than
+ // the previous one, the scan does not violate READ_YOUR_WRITES
+ // session guarantees.
+ lastPropagatedTimestamp = resp.scanTimestamp;
+ } else if (resp.propagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ // Otherwise we just use the propagated timestamp returned from
+ // the server as the latest propagated timestamp.
+ lastPropagatedTimestamp = resp.propagatedTimestamp;
+ }
+ if (lastPropagatedTimestamp != AsyncKuduClient.NO_TIMESTAMP) {
+ client.updateLastPropagatedTimestamp(lastPropagatedTimestamp);
+ }
numRowsReturned += resp.data.getNumRows();
if (isFaultTolerant && resp.lastPrimaryKey != null) {
lastPrimaryKey = resp.lastPrimaryKey;
}
+ if (resp.resourceMetricsPb != null) {
+ resourceMetrics.update(resp.resourceMetricsPb);
+ }
if (!resp.more) { // We're done scanning this tablet.
scanFinished();
return resp.data;
diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
index 16a948693..1f61d2c73 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/ITScannerMultiTablet.java
@@ -53,6 +53,7 @@ public class ITScannerMultiTablet {
ITScannerMultiTablet.class.getName() + "-" + System.currentTimeMillis();
protected static final int ROW_COUNT = 20000;
protected static final int TABLET_COUNT = 3;
+ protected static final String METRIC_NAME = "total_duration_nanos";
private static Schema schema = getBasicSchema();
protected KuduTable table;
@@ -222,6 +223,12 @@ public class ITScannerMultiTablet {
int previousRow = Integer.MIN_VALUE;
boolean faultInjected = !this.enableFaultInjection;
int faultInjectionLowBound = (ROW_COUNT / TABLET_COUNT / 2);
+ boolean firstScanRequest = true;
+
+ long firstScannedMetric = 0;
+ long firstPropagatedTimestamp = 0;
+ long lastScannedMetric = 0;
+ long lastPropagatedTimestamp = 0;
while (scanner.hasMoreRows()) {
RowResultIterator rri = scanner.nextRows();
while (rri.hasNext()) {
@@ -234,11 +241,21 @@ public class ITScannerMultiTablet {
if (!faultInjected && rowCount > faultInjectionLowBound) {
harness.restartTabletServer(scanner.currentTablet());
faultInjected = true;
+ } else {
+ if (firstScanRequest) {
+ firstScannedMetric = scanner.getResourceMetrics().getMetric(METRIC_NAME);
+ firstPropagatedTimestamp = harness.getClient().getLastPropagatedTimestamp();
+ firstScanRequest = false;
+ }
+ lastScannedMetric = scanner.getResourceMetrics().getMetric(METRIC_NAME);
+ lastPropagatedTimestamp = harness.getClient().getLastPropagatedTimestamp();
}
previousRow = key;
rowCount++;
}
}
+ assertTrue(lastScannedMetric != firstScannedMetric);
+ assertTrue(lastPropagatedTimestamp > firstPropagatedTimestamp);
} catch (Exception e) {
LOG.error("Scan error, {}", e.getMessage());
e.printStackTrace();