You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2022/02/01 21:54:33 UTC
[hbase] branch branch-2.5 updated: HBASE-26472 Adhere to semantic conventions regarding table data operations (addendum)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new ec98659 HBASE-26472 Adhere to semantic conventions regarding table data operations (addendum)
ec98659 is described below
commit ec9865947cf21bffb8e1b65330b294f287930938
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed Jan 26 17:04:44 2022 -0800
HBASE-26472 Adhere to semantic conventions regarding table data operations (addendum)
Ensure table data operations emit one and only one span.
---
.../org/apache/hadoop/hbase/client/HTable.java | 104 +++++++++------------
.../hadoop/hbase/client/TestAsyncTableTracing.java | 10 +-
.../hadoop/hbase/client/TestHTableTracing.java | 10 +-
3 files changed, 57 insertions(+), 67 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 7b83fad..f57ca09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -433,33 +433,27 @@ public class HTable implements Table {
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
- .setTableName(tableName)
- .setOperation(HBaseSemanticAttributes.Operation.BATCH)
- .setContainerOperations(actions);
- TraceUtil.traceWithIOException(() -> {
- int rpcTimeout = writeRpcTimeoutMs;
- boolean hasRead = false;
- boolean hasWrite = false;
- for (Row action : actions) {
- if (action instanceof Mutation) {
- hasWrite = true;
- } else {
- hasRead = true;
- }
- if (hasRead && hasWrite) {
- break;
- }
- }
- if (hasRead && !hasWrite) {
- rpcTimeout = readRpcTimeoutMs;
+ int rpcTimeout = writeRpcTimeoutMs;
+ boolean hasRead = false;
+ boolean hasWrite = false;
+ for (Row action : actions) {
+ if (action instanceof Mutation) {
+ hasWrite = true;
+ } else {
+ hasRead = true;
}
- try {
- batch(actions, results, rpcTimeout);
- } catch (InterruptedException e) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ if (hasRead && hasWrite) {
+ break;
}
- }, supplier);
+ }
+ if (hasRead && !hasWrite) {
+ rpcTimeout = readRpcTimeoutMs;
+ }
+ try {
+ batch(actions, results, rpcTimeout);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
}
public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
@@ -555,29 +549,23 @@ public class HTable implements Table {
@Override
public void delete(final List<Delete> deletes) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
- .setTableName(tableName)
- .setOperation(HBaseSemanticAttributes.Operation.BATCH)
- .setContainerOperations(deletes);
- TraceUtil.traceWithIOException(() -> {
- Object[] results = new Object[deletes.size()];
- try {
- batch(deletes, results, writeRpcTimeoutMs);
- } catch (InterruptedException e) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(e);
- } finally {
- // TODO: to be consistent with batch put(), do not modify input list
- // mutate list so that it is empty for complete success, or contains only failed records
- // results are returned in the same order as the requests in list walk the list backwards,
- // so we can remove from list without impacting the indexes of earlier members
- for (int i = results.length - 1; i >= 0; i--) {
- // if result is not null, it succeeded
- if (results[i] instanceof Result) {
- deletes.remove(i);
- }
+ Object[] results = new Object[deletes.size()];
+ try {
+ batch(deletes, results, writeRpcTimeoutMs);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ } finally {
+ // TODO: to be consistent with batch put(), do not modify input list
+ // mutate list so that it is empty for complete success, or contains only failed records
+ // results are returned in the same order as the requests in list walk the list backwards,
+ // so we can remove from list without impacting the indexes of earlier members
+ for (int i = results.length - 1; i >= 0; i--) {
+ // if result is not null, it succeeded
+ if (results[i] instanceof Result) {
+ deletes.remove(i);
}
}
- }, supplier);
+ }
}
@Override
@@ -605,21 +593,15 @@ public class HTable implements Table {
@Override
public void put(final List<Put> puts) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
- .setTableName(tableName)
- .setOperation(HBaseSemanticAttributes.Operation.BATCH)
- .setContainerOperations(puts);
- TraceUtil.traceWithIOException(() -> {
- for (Put put : puts) {
- validatePut(put);
- }
- Object[] results = new Object[puts.size()];
- try {
- batch(puts, results, writeRpcTimeoutMs);
- } catch (InterruptedException e) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(e);
- }
- }, supplier);
+ for (Put put : puts) {
+ validatePut(put);
+ }
+ Object[] results = new Object[puts.size()];
+ try {
+ batch(puts, results, writeRpcTimeoutMs);
+ } catch (InterruptedException e) {
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e);
+ }
}
@Override
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
index a2e4f5c..04df972 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
@@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -41,9 +42,11 @@ import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -253,11 +256,12 @@ public class TestAsyncTableTracing {
Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> traceRule.getSpans(), hasItem(spanLocator)));
- SpanData data = traceRule.getSpans()
+ List<SpanData> candidateSpans = traceRule.getSpans()
.stream()
.filter(spanLocator::matches)
- .findFirst()
- .orElseThrow(AssertionError::new);
+ .collect(Collectors.toList());
+ assertThat(candidateSpans, hasSize(1));
+ SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
index 4b94ad9..a4adfe5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
@@ -29,6 +29,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.hasSize;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
@@ -42,8 +43,10 @@ import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.util.Arrays;
+import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
@@ -251,11 +254,12 @@ public class TestHTableTracing extends TestTracingBase {
Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
"waiting for span to emit",
() -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
- SpanData data = TRACE_RULE.getSpans()
+ List<SpanData> candidateSpans = TRACE_RULE.getSpans()
.stream()
.filter(spanLocator::matches)
- .findFirst()
- .orElseThrow(AssertionError::new);
+ .collect(Collectors.toList());
+ assertThat(candidateSpans, hasSize(1));
+ SpanData data = candidateSpans.iterator().next();
assertThat(data, allOf(
hasName(expectedName),
hasKind(SpanKind.CLIENT),