You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2022/01/18 20:30:11 UTC
[hbase] branch branch-2 updated: HBASE-26474 Implement connection-level attributes (#4014)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new d4f2b66 HBASE-26474 Implement connection-level attributes (#4014)
d4f2b66 is described below
commit d4f2b66a431907d75ebdbeb2e6999af39a350bd6
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Tue Jan 18 12:29:35 2022 -0800
HBASE-26474 Implement connection-level attributes (#4014)
Add support for `db.system`, `db.connection_string`, `db.user`.
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Huaxiang Sun <hu...@apache.org>
Co-authored-by: Josh Elser <jo...@gmail.com>
---
.../client/AbstractRpcBasedConnectionRegistry.java | 5 +
.../hadoop/hbase/client/AsyncConnectionImpl.java | 10 +-
.../hadoop/hbase/client/AsyncRegionLocator.java | 69 +++++++++----
.../hadoop/hbase/client/ClusterConnection.java | 14 ++-
.../hbase/client/ConnectionImplementation.java | 12 ++-
.../hadoop/hbase/client/ConnectionRegistry.java | 9 +-
.../apache/hadoop/hbase/client/HRegionLocator.java | 84 +++++++++++++---
.../org/apache/hadoop/hbase/client/HTable.java | 70 +++++++------
.../apache/hadoop/hbase/client/MasterRegistry.java | 12 +++
.../hadoop/hbase/client/RawAsyncTableImpl.java | 2 +-
.../hadoop/hbase/client/RpcConnectionRegistry.java | 22 +++-
.../hadoop/hbase/client/ZKConnectionRegistry.java | 7 ++
.../hbase/client/trace/ConnectionSpanBuilder.java | 106 +++++++++++++++++++
.../client/trace/TableOperationSpanBuilder.java | 24 +++--
.../hbase/client/trace/TableSpanBuilder.java | 99 ++++++++++++++++++
.../hbase/client/DoNothingConnectionRegistry.java | 5 +
.../client/TestAsyncRegionLocatorTracing.java | 112 ++++++++++++++-------
.../hadoop/hbase/client/TestAsyncTableTracing.java | 40 ++------
.../hadoop/hbase/client/TestHTableTracing.java | 8 +-
.../hbase/client/TestRegionLocatorTracing.java | 64 ++++++++++--
.../hadoop/hbase/client/TestTracingBase.java | 35 +++++++
.../client/trace/hamcrest/AttributesMatchers.java | 12 +++
.../hbase/client/trace/hamcrest/TraceTestUtil.java | 65 ++++++++++++
.../hbase/trace/HBaseSemanticAttributes.java | 5 +
.../org/apache/hadoop/hbase/trace/TraceUtil.java | 12 ---
.../hadoop/hbase/client/RegionServerRegistry.java | 5 +
26 files changed, 733 insertions(+), 175 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
index 54138d3..0ee374d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java
@@ -273,6 +273,11 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry
}
@Override
+ public String getConnectionString() {
+ return "unimplemented";
+ }
+
+ @Override
public void close() {
trace(() -> {
if (registryEndpointRefresher != null) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index e7a198c..d458c6c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -71,7 +71,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncConnection {
+public class AsyncConnectionImpl implements AsyncConnection {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
@@ -191,6 +191,14 @@ class AsyncConnectionImpl implements AsyncConnection {
return choreService;
}
+ public User getUser() {
+ return user;
+ }
+
+ public ConnectionRegistry getConnectionRegistry() {
+ return registry;
+ }
+
@Override
public Configuration getConfiguration() {
return conf;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 7e275a8..09cae35 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,24 +20,27 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.SERVER_NAME_KEY;
-import static org.apache.hadoop.hbase.trace.TraceUtil.createSpan;
-import static org.apache.hadoop.hbase.trace.TraceUtil.createTableSpan;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.trace.ConnectionSpanBuilder;
+import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -95,9 +98,12 @@ class AsyncRegionLocator {
return TableName.isMetaTableName(tableName);
}
- private <T> CompletableFuture<T> tracedLocationFuture(Supplier<CompletableFuture<T>> action,
- Function<T, List<String>> getRegionNames, TableName tableName, String methodName) {
- Span span = createTableSpan("AsyncRegionLocator." + methodName, tableName);
+ private <T> CompletableFuture<T> tracedLocationFuture(
+ Supplier<CompletableFuture<T>> action,
+ Function<T, List<String>> getRegionNames,
+ Supplier<Span> spanSupplier
+ ) {
+ final Span span = spanSupplier.get();
try (Scope scope = span.makeCurrent()) {
CompletableFuture<T> future = action.get();
FutureUtils.addListener(future, (resp, error) -> {
@@ -116,18 +122,30 @@ class AsyncRegionLocator {
}
}
- private List<String> getRegionName(RegionLocations locs) {
- List<String> names = new ArrayList<>();
- for (HRegionLocation loc : locs.getRegionLocations()) {
- if (loc != null) {
- names.add(loc.getRegion().getRegionNameAsString());
- }
+ static List<String> getRegionNames(RegionLocations locs) {
+ if (locs == null || locs.getRegionLocations() == null) {
+ return Collections.emptyList();
}
- return names;
+ return Arrays.stream(locs.getRegionLocations())
+ .filter(Objects::nonNull)
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getRegionNameAsString)
+ .collect(Collectors.toList());
+ }
+
+ static List<String> getRegionNames(HRegionLocation location) {
+ return Optional.ofNullable(location)
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getRegionNameAsString)
+ .map(Collections::singletonList)
+ .orElseGet(Collections::emptyList);
}
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
RegionLocateType type, boolean reload, long timeoutNs) {
+ final Supplier<Span> supplier = new TableSpanBuilder(conn)
+ .setName("AsyncRegionLocator.getRegionLocations")
+ .setTableName(tableName);
return tracedLocationFuture(() -> {
CompletableFuture<RegionLocations> future = isMeta(tableName) ?
metaRegionLocator.getRegionLocations(RegionReplicaUtil.DEFAULT_REPLICA_ID, reload) :
@@ -137,11 +155,14 @@ class AsyncRegionLocator {
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region locations for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "'");
- }, this::getRegionName, tableName, "getRegionLocations");
+ }, AsyncRegionLocator::getRegionNames, supplier);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
+ final Supplier<Span> supplier = new TableSpanBuilder(conn)
+ .setName("AsyncRegionLocator.getRegionLocation")
+ .setTableName(tableName);
return tracedLocationFuture(() -> {
// meta region can not be split right now so we always call the same method.
// Change it later if the meta table can have more than one regions.
@@ -172,8 +193,7 @@ class AsyncRegionLocator {
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs) +
"ms) waiting for region location for " + tableName + ", row='" +
Bytes.toStringBinary(row) + "', replicaId=" + replicaId);
- }, loc -> Arrays.asList(loc.getRegion().getRegionNameAsString()), tableName,
- "getRegionLocation");
+ }, AsyncRegionLocator::getRegionNames, supplier);
}
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
@@ -201,6 +221,9 @@ class AsyncRegionLocator {
}
void clearCache(TableName tableName) {
+ Supplier<Span> supplier = new TableSpanBuilder(conn)
+ .setName("AsyncRegionLocator.clearCache")
+ .setTableName(tableName);
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", tableName);
if (tableName.equals(META_TABLE_NAME)) {
@@ -208,24 +231,28 @@ class AsyncRegionLocator {
} else {
nonMetaRegionLocator.clearCache(tableName);
}
- }, () -> createTableSpan("AsyncRegionLocator.clearCache", tableName));
+ }, supplier);
}
void clearCache(ServerName serverName) {
+ Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
+ .setName("AsyncRegionLocator.clearCache")
+ .addAttribute(SERVER_NAME_KEY, serverName.getServerName());
TraceUtil.trace(() -> {
LOG.debug("Clear meta cache for {}", serverName);
metaRegionLocator.clearCache(serverName);
nonMetaRegionLocator.clearCache(serverName);
conn.getConnectionMetrics().ifPresent(MetricsConnection::incrMetaCacheNumClearServer);
- }, () -> createSpan("AsyncRegionLocator.clearCache").setAttribute(SERVER_NAME_KEY,
- serverName.getServerName()));
+ }, supplier);
}
void clearCache() {
+ Supplier<Span> supplier = new ConnectionSpanBuilder(conn)
+ .setName("AsyncRegionLocator.clearCache");
TraceUtil.trace(() -> {
metaRegionLocator.clearCache();
nonMetaRegionLocator.clearCache();
- }, "AsyncRegionLocator.clearCache");
+ }, supplier);
}
AsyncNonMetaRegionLocator getNonMetaRegionLocator() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index 76022596..2770561 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.backoff.ClientBackoffPolicy;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -175,8 +176,7 @@ public interface ClusterConnection extends Connection {
* question
* @throws IOException if a remote or network exception occurs
*/
- HRegionLocation locateRegion(final byte[] regionName)
- throws IOException;
+ HRegionLocation locateRegion(final byte[] regionName) throws IOException;
/**
* Gets the locations of all regions in the specified table, <i>tableName</i>.
@@ -335,4 +335,14 @@ public interface ClusterConnection extends Connection {
* Get the bootstrap node list of another region server.
*/
List<ServerName> getAllBootstrapNodes(ServerName regionServer) throws IOException;
+
+ /**
+ * Get the {@link User} associated with this connection. May be {@code null}.
+ */
+ User getUser();
+
+ /**
+ * Get the {@link ConnectionRegistry} used to orient this cluster.
+ */
+ ConnectionRegistry getConnectionRegistry();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 5fec87e..04ca5ee 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -166,7 +166,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.Updat
value="AT_OPERATION_SEQUENCE_ON_CONCURRENT_ABSTRACTION",
justification="Access to the conncurrent hash map is under a lock so should be fine.")
@InterfaceAudience.Private
-class ConnectionImplementation implements ClusterConnection, Closeable {
+public class ConnectionImplementation implements ClusterConnection, Closeable {
public static final String RETRIES_BY_SERVER_KEY = "hbase.client.retries.by.server";
private static final Logger LOG = LoggerFactory.getLogger(ConnectionImplementation.class);
@@ -513,6 +513,16 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return this.metrics;
}
+ @Override
+ public User getUser() {
+ return user;
+ }
+
+ @Override
+ public ConnectionRegistry getConnectionRegistry() {
+ return registry;
+ }
+
private ThreadPoolExecutor getBatchPool() {
if (batchPool == null) {
synchronized (this) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
index cd22d78..975d8df 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistry.java
@@ -29,7 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Internal use only.
*/
@InterfaceAudience.Private
-interface ConnectionRegistry extends Closeable {
+public interface ConnectionRegistry extends Closeable {
/**
* Get the location of meta region(s).
@@ -49,6 +49,13 @@ interface ConnectionRegistry extends Closeable {
CompletableFuture<ServerName> getActiveMaster();
/**
+ * Return the connection string associated with this registry instance. This value is
+ * informational, used for annotating traces. Values returned may not be valid for establishing a
+ * working cluster connection.
+ */
+ String getConnectionString();
+
+ /**
* Closes this instance and releases any system resources associated with it
*/
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java
index 1558aa8..b62f090 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HRegionLocator.java
@@ -18,18 +18,28 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REGION_NAMES_KEY;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
/**
* An implementation of {@link RegionLocator}. Used to view region location information for a single
@@ -62,24 +72,34 @@ public class HRegionLocator implements RegionLocator {
@Override
public HRegionLocation getRegionLocation(byte[] row, int replicaId, boolean reload)
throws IOException {
- return TraceUtil.trace(() -> connection.locateRegion(tableName, row, !reload, true, replicaId)
- .getRegionLocation(replicaId), () -> TraceUtil
- .createTableSpan(getClass().getSimpleName() + ".getRegionLocation", tableName));
+ final Supplier<Span> supplier = new TableSpanBuilder(connection)
+ .setName("HRegionLocator.getRegionLocation")
+ .setTableName(tableName);
+ return tracedLocationFuture(
+ () -> connection.locateRegion(tableName, row, !reload, true, replicaId)
+ .getRegionLocation(replicaId),
+ AsyncRegionLocator::getRegionNames,
+ supplier);
}
@Override
public List<HRegionLocation> getRegionLocations(byte[] row, boolean reload) throws IOException {
- return TraceUtil.trace(() -> {
- RegionLocations locs =
- connection.locateRegion(tableName, row, !reload, true, RegionInfo.DEFAULT_REPLICA_ID);
- return Arrays.asList(locs.getRegionLocations());
- }, () -> TraceUtil
- .createTableSpan(getClass().getSimpleName() + ".getRegionLocations", tableName));
+ final Supplier<Span> supplier = new TableSpanBuilder(connection)
+ .setName("HRegionLocator.getRegionLocations")
+ .setTableName(tableName);
+ final RegionLocations locs = tracedLocationFuture(
+ () -> connection.locateRegion(tableName, row, !reload, true,
+ RegionInfo.DEFAULT_REPLICA_ID),
+ AsyncRegionLocator::getRegionNames, supplier);
+ return Arrays.asList(locs.getRegionLocations());
}
@Override
public List<HRegionLocation> getAllRegionLocations() throws IOException {
- return TraceUtil.trace(() -> {
+ final Supplier<Span> supplier = new TableSpanBuilder(connection)
+ .setName("HRegionLocator.getAllRegionLocations")
+ .setTableName(tableName);
+ return tracedLocationFuture(() -> {
ArrayList<HRegionLocation> regions = new ArrayList<>();
for (RegionLocations locations : listRegionLocations()) {
for (HRegionLocation location : locations.getRegionLocations()) {
@@ -88,15 +108,27 @@ public class HRegionLocator implements RegionLocator {
connection.cacheLocation(tableName, locations);
}
return regions;
- }, () -> TraceUtil
- .createTableSpan(getClass().getSimpleName() + ".getAllRegionLocations", tableName));
+ }, HRegionLocator::getRegionNames, supplier);
+ }
+
+ private static List<String> getRegionNames(List<HRegionLocation> locations) {
+ if (CollectionUtils.isEmpty(locations)) {
+ return Collections.emptyList();
+ }
+ return locations.stream()
+ .filter(Objects::nonNull)
+ .map(AsyncRegionLocator::getRegionNames)
+ .filter(Objects::nonNull)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
}
@Override
public void clearRegionLocationCache() {
- TraceUtil.trace(() ->
- connection.clearRegionCache(tableName), () -> TraceUtil
- .createTableSpan(this.getClass().getSimpleName() + ".clearRegionLocationCache", tableName));
+ final Supplier<Span> supplier = new TableSpanBuilder(connection)
+ .setName("HRegionLocator.clearRegionLocationCache")
+ .setTableName(tableName);
+ TraceUtil.trace(() -> connection.clearRegionCache(tableName), supplier);
}
@Override
@@ -124,4 +156,26 @@ public class HRegionLocator implements RegionLocator {
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName);
return regions;
}
+
+ private <T> T tracedLocationFuture(
+ TraceUtil.IOExceptionCallable<T> action,
+ Function<T, List<String>> getRegionNames,
+ Supplier<Span> spanSupplier
+ ) throws IOException {
+ final Span span = spanSupplier.get();
+ try (Scope ignored = span.makeCurrent()) {
+ final T result = action.call();
+ final List<String> regionNames = getRegionNames.apply(result);
+ if (!CollectionUtils.isEmpty(regionNames)) {
+ span.setAttribute(REGION_NAMES_KEY, regionNames);
+ }
+ span.setStatus(StatusCode.OK);
+ span.end();
+ return result;
+ } catch (IOException e) {
+ TraceUtil.setError(span, e);
+ span.end();
+ throw e;
+ }
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index f72b065..2f870b7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -29,6 +29,7 @@ import com.google.protobuf.Service;
import com.google.protobuf.ServiceException;
import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
import org.apache.hadoop.hbase.client.trace.TableOperationSpanBuilder;
+import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.TimeRange;
@@ -359,7 +361,7 @@ public class HTable implements Table {
@Override
public Result get(final Get get) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(get);
return TraceUtil.trace(() -> get(get, get.isCheckExistenceOnly()), supplier);
@@ -402,7 +404,7 @@ public class HTable implements Table {
@Override
public Result[] get(List<Get> gets) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
@@ -429,7 +431,7 @@ public class HTable implements Table {
@Override
public void batch(final List<? extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
@@ -468,7 +470,7 @@ public class HTable implements Table {
.setOperationTimeout(operationTimeoutMs)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
- final Span span = new TableOperationSpanBuilder()
+ final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.build();
@@ -507,7 +509,7 @@ public class HTable implements Table {
.setRpcTimeout(writeTimeout)
.setSubmittedRows(AsyncProcessTask.SubmittedRows.ALL)
.build();
- final Span span = new TableOperationSpanBuilder()
+ final Span span = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH)
.build();
@@ -525,7 +527,7 @@ public class HTable implements Table {
@Override
public void delete(final Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(delete);
TraceUtil.traceWithIOException(() -> {
@@ -547,7 +549,7 @@ public class HTable implements Table {
@Override
public void delete(final List<Delete> deletes) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
@@ -573,7 +575,7 @@ public class HTable implements Table {
@Override
public void put(final Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(put);
TraceUtil.traceWithIOException(() -> {
@@ -596,7 +598,7 @@ public class HTable implements Table {
@Override
public void put(final List<Put> puts) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
TraceUtil.traceWithIOException(() -> {
@@ -614,7 +616,7 @@ public class HTable implements Table {
@Override
public Result mutateRow(final RowMutations rm) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
@@ -670,7 +672,7 @@ public class HTable implements Table {
@Override
public Result append(final Append append) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(append);
return TraceUtil.trace(() -> {
@@ -697,7 +699,7 @@ public class HTable implements Table {
@Override
public Result increment(final Increment increment) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(increment);
return TraceUtil.trace(() -> {
@@ -731,7 +733,7 @@ public class HTable implements Table {
public long incrementColumnValue(final byte [] row, final byte [] family,
final byte [] qualifier, final long amount, final Durability durability)
throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.INCREMENT);
return TraceUtil.trace(() -> {
@@ -769,7 +771,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final byte [] value, final Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -782,7 +784,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -795,7 +797,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -807,7 +809,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final byte[] value, final Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -820,7 +822,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -833,7 +835,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier,
final CompareOperator op, final byte[] value, final Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -910,7 +912,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final byte [] value, final RowMutations rm) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -923,7 +925,7 @@ public class HTable implements Table {
@Deprecated
public boolean checkAndMutate(final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOperator op, final byte [] value, final RowMutations rm) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -933,7 +935,7 @@ public class HTable implements Table {
@Override
public CheckAndMutateResult checkAndMutate(CheckAndMutate checkAndMutate) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(checkAndMutate);
return TraceUtil.trace(() -> {
@@ -982,7 +984,7 @@ public class HTable implements Table {
@Override
public List<CheckAndMutateResult> checkAndMutate(List<CheckAndMutate> checkAndMutates)
throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
@@ -1040,7 +1042,7 @@ public class HTable implements Table {
@Override
public boolean exists(final Get get) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(get);
return TraceUtil.trace(() -> {
@@ -1052,7 +1054,7 @@ public class HTable implements Table {
@Override
public boolean[] exists(List<Get> gets) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.BATCH);
return TraceUtil.trace(() -> {
@@ -1108,6 +1110,10 @@ public class HTable implements Table {
@Override
public void close() throws IOException {
+ final Supplier<Span> supplier = new TableSpanBuilder(connection)
+ .setName("HTable.close")
+ .setTableName(tableName)
+ .setSpanKind(SpanKind.INTERNAL);
TraceUtil.traceWithIOException(() -> {
if (this.closed) {
return;
@@ -1126,7 +1132,7 @@ public class HTable implements Table {
}
}
this.closed = true;
- }, () -> TraceUtil.createTableSpan(getClass().getSimpleName() + ".close", tableName));
+ }, supplier);
}
// validate for well-formedness
@@ -1456,7 +1462,7 @@ public class HTable implements Table {
@Override
public boolean thenPut(Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
@@ -1469,7 +1475,7 @@ public class HTable implements Table {
@Override
public boolean thenDelete(Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
@@ -1481,7 +1487,7 @@ public class HTable implements Table {
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
@@ -1511,7 +1517,7 @@ public class HTable implements Table {
@Override
public boolean thenPut(Put put) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(() -> {
@@ -1523,7 +1529,7 @@ public class HTable implements Table {
@Override
public boolean thenDelete(Delete delete) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
@@ -1533,7 +1539,7 @@ public class HTable implements Table {
@Override
public boolean thenMutate(RowMutations mutation) throws IOException {
- final Supplier<Span> supplier = new TableOperationSpanBuilder()
+ final Supplier<Span> supplier = new TableOperationSpanBuilder(connection)
.setTableName(tableName)
.setOperation(HBaseSemanticAttributes.Operation.CHECK_AND_MUTATE);
return TraceUtil.trace(
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
index 64e389c..05773d0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java
@@ -87,9 +87,12 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
return masterAddrs;
}
+ private final String connectionString;
+
MasterRegistry(Configuration conf) throws IOException {
super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS,
MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES);
+ connectionString = getConnectionString(conf);
}
@Override
@@ -102,6 +105,15 @@ public class MasterRegistry extends AbstractRpcBasedConnectionRegistry {
return getMasters();
}
+ @Override
+ public String getConnectionString() {
+ return connectionString;
+ }
+
+ static String getConnectionString(Configuration conf) throws UnknownHostException {
+ return getMasterAddr(conf);
+ }
+
/**
* Builds the default master address end point if it is not specified in the configuration.
* <p/>
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 5ed5e7e..c3cc1fb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -261,7 +261,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
private TableOperationSpanBuilder newTableOperationSpanBuilder() {
- return new TableOperationSpanBuilder().setTableName(tableName);
+ return new TableOperationSpanBuilder(conn).setTableName(tableName);
}
@Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
index 731d620..660d74e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
+import java.net.UnknownHostException;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
@@ -72,9 +73,23 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
private static final char ADDRS_CONF_SEPARATOR = ',';
+ private final String connectionString;
+
RpcConnectionRegistry(Configuration conf) throws IOException {
super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS,
MIN_SECS_BETWEEN_REFRESHES);
+ connectionString = buildConnectionString(conf);
+ }
+
+ private String buildConnectionString(Configuration conf) throws UnknownHostException {
+ final String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES);
+ if (StringUtils.isBlank(configuredBootstrapNodes)) {
+ return MasterRegistry.getConnectionString(conf);
+ }
+ return Splitter.on(ADDRS_CONF_SEPARATOR)
+ .trimResults()
+ .splitToStream(configuredBootstrapNodes)
+ .collect(Collectors.joining(String.valueOf(ADDRS_CONF_SEPARATOR)));
}
@Override
@@ -91,6 +106,11 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry {
}
}
+ @Override
+ public String getConnectionString() {
+ return connectionString;
+ }
+
private static Set<ServerName> transformServerNames(GetBootstrapNodesResponse resp) {
return resp.getServerNameList().stream().map(ProtobufUtil::toServerName)
.collect(Collectors.toSet());
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
index 6e94afe..abb9856 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java
@@ -248,6 +248,13 @@ class ZKConnectionRegistry implements ConnectionRegistry {
}
@Override
+ public String getConnectionString() {
+ final String serverList = zk.getConnectString();
+ final String baseZNode = znodePaths.baseZNode;
+ return serverList + ":" + baseZNode;
+ }
+
+ @Override
public void close() {
zk.close();
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
new file mode 100644
index 0000000..93a9d8f
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.trace;
+
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_CONNECTION_STRING;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM_VALUE;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_USER;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Construct {@link Span} instances originating from the client side of a connection.
+ */
+@InterfaceAudience.Private
+public class ConnectionSpanBuilder implements Supplier<Span> {
+
+ private String name;
+ private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
+
+ public ConnectionSpanBuilder(final AsyncConnectionImpl conn) {
+ populateConnectionAttributes(attributes, conn);
+ }
+
+ @Override
+ public Span get() {
+ return build();
+ }
+
+ public ConnectionSpanBuilder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public <T> ConnectionSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
+ attributes.put(key, value);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Span build() {
+ final SpanBuilder builder = TraceUtil.getGlobalTracer()
+ .spanBuilder(name)
+ // TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
+ .setSpanKind(SpanKind.CLIENT);
+ attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
+ return builder.startSpan();
+ }
+
+ /**
+ * @see #populateConnectionAttributes(Map, AsyncConnectionImpl)
+ */
+ static void populateConnectionAttributes(
+ final Map<AttributeKey<?>, Object> attributes,
+ final ClusterConnection conn
+ ) {
+ attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
+ attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
+ attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
+ .map(Object::toString)
+ .orElse(null));
+ }
+
+ /**
+ * Static utility method that performs the primary logic of this builder. It is visible to other
+ * classes in this package so that other builders can use this functionality as a mix-in.
+ * @param attributes the attributes map to be populated.
+ * @param conn the source of attribute values.
+ */
+ static void populateConnectionAttributes(
+ final Map<AttributeKey<?>, Object> attributes,
+ final AsyncConnectionImpl conn
+ ) {
+ attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
+ attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
+ attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
+ .map(Object::toString)
+ .orElse(null));
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java
index aaa5361..2b9314a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableOperationSpanBuilder.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hbase.client.trace;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_OPERATION;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
@@ -32,7 +29,9 @@ import java.util.Map;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.CheckAndMutate;
+import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
@@ -46,8 +45,8 @@ import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Construct {@link io.opentelemetry.api.trace.Span} instances originating from
- * "table operations" -- the verbs in our public API that interact with data in tables.
+ * Construct {@link Span} instances originating from "table operations" -- the verbs in our public
+ * API that interact with data in tables.
*/
@InterfaceAudience.Private
public class TableOperationSpanBuilder implements Supplier<Span> {
@@ -60,7 +59,16 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
private TableName tableName;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
- @Override public Span get() {
+ public TableOperationSpanBuilder(final ClusterConnection conn) {
+ ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
+ }
+
+ public TableOperationSpanBuilder(final AsyncConnectionImpl conn) {
+ ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
+ }
+
+ @Override
+ public Span get() {
return build();
}
@@ -84,9 +92,7 @@ public class TableOperationSpanBuilder implements Supplier<Span> {
public TableOperationSpanBuilder setTableName(final TableName tableName) {
this.tableName = tableName;
- attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
- attributes.put(DB_NAME, tableName.getNamespaceAsString());
- attributes.put(TABLE_KEY, tableName.getNameAsString());
+ TableSpanBuilder.populateTableNameAttributes(attributes, tableName);
return this;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java
new file mode 100644
index 0000000..8973da6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/TableSpanBuilder.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.trace;
+
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_NAME;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.function.Supplier;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Construct {@link Span} instances involving data tables.
+ */
+@InterfaceAudience.Private
+public class TableSpanBuilder implements Supplier<Span> {
+
+ private String name;
+ private SpanKind spanKind = SpanKind.CLIENT;
+ private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
+
+ public TableSpanBuilder(ClusterConnection conn) {
+ ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
+ }
+
+ public TableSpanBuilder(AsyncConnectionImpl conn) {
+ ConnectionSpanBuilder.populateConnectionAttributes(attributes, conn);
+ }
+
+ @Override
+ public Span get() {
+ return build();
+ }
+
+ public TableSpanBuilder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public TableSpanBuilder setSpanKind(final SpanKind spanKind) {
+ this.spanKind = spanKind;
+ return this;
+ }
+
+ public TableSpanBuilder setTableName(final TableName tableName) {
+ populateTableNameAttributes(attributes, tableName);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Span build() {
+ final SpanBuilder builder = TraceUtil.getGlobalTracer()
+ .spanBuilder(name)
+ // TODO: what about clients embedded in Master/RegionServer/Gateways/&c?
+ .setSpanKind(spanKind);
+ attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
+ return builder.startSpan();
+ }
+
+ /**
+ * Static utility method that performs the primary logic of this builder. It is visible to other
+ * classes in this package so that other builders can use this functionality as a mix-in.
+ * @param attributes the attributes map to be populated.
+ * @param tableName the source of attribute values.
+ */
+ static void populateTableNameAttributes(
+ final Map<AttributeKey<?>, Object> attributes,
+ final TableName tableName
+ ) {
+ attributes.put(NAMESPACE_KEY, tableName.getNamespaceAsString());
+ attributes.put(DB_NAME, tableName.getNamespaceAsString());
+ attributes.put(TABLE_KEY, tableName.getNameAsString());
+ }
+}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
index 4bd66877..3b792a5 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java
@@ -48,6 +48,11 @@ class DoNothingConnectionRegistry implements ConnectionRegistry {
}
@Override
+ public String getConnectionString() {
+ return "nothing";
+ }
+
+ @Override
public void close() {
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
index 4f4a29c..47e09f8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,13 +17,25 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
-
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.hasItem;
+import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
-import java.util.List;
+import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -31,6 +43,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
@@ -38,24 +51,26 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
+import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncRegionLocatorTracing {
+ private static final Logger LOG = LoggerFactory.getLogger(TestAsyncRegionLocatorTracing.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncRegionLocatorTracing.class);
- private static Configuration CONF = HBaseConfiguration.create();
+ private static final Configuration CONF = HBaseConfiguration.create();
private AsyncConnectionImpl conn;
@@ -89,16 +104,35 @@ public class TestAsyncRegionLocatorTracing {
}
private SpanData waitSpan(String name) {
- Waiter.waitFor(CONF, 1000,
- () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
- return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
+ return waitSpan(hasName(name));
+ }
+
+ private SpanData waitSpan(Matcher<SpanData> matcher) {
+ Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
+ try {
+ Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
+ "waiting for span",
+ () -> traceRule.getSpans(), hasItem(spanLocator)));
+ } catch (AssertionError e) {
+ LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
+ traceRule.getSpans());
+ throw e;
+ }
+ return traceRule.getSpans()
+ .stream()
+ .filter(spanLocator::matches)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
}
@Test
public void testClearCache() {
conn.getLocator().clearCache();
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
- assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn)));
}
@Test
@@ -106,19 +140,22 @@ public class TestAsyncRegionLocatorTracing {
ServerName sn = ServerName.valueOf("127.0.0.1", 12345, System.currentTimeMillis());
conn.getLocator().clearCache(sn);
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
- assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
- assertEquals(sn.toString(), span.getAttributes().get(HBaseSemanticAttributes.SERVER_NAME_KEY));
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ hasAttributes(containsEntry("db.hbase.server.name", sn.getServerName()))));
}
@Test
public void testClearCacheTableName() {
conn.getLocator().clearCache(TableName.META_TABLE_NAME);
SpanData span = waitSpan("AsyncRegionLocator.clearCache");
- assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
- assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
- assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME)));
}
@Test
@@ -126,15 +163,14 @@ public class TestAsyncRegionLocatorTracing {
conn.getLocator().getRegionLocation(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
RegionLocateType.CURRENT, TimeUnit.SECONDS.toNanos(1)).join();
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocation");
- assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
- assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
- assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
- List<String> regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY);
- assertEquals(1, regionNames.size());
- assertEquals(locs.getDefaultRegionLocation().getRegion().getRegionNameAsString(),
- regionNames.get(0));
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME),
+ hasAttributes(
+ containsEntryWithStringValuesOf("db.hbase.regions",
+ locs.getDefaultRegionLocation().getRegion().getRegionNameAsString()))));
}
@Test
@@ -142,16 +178,16 @@ public class TestAsyncRegionLocatorTracing {
conn.getLocator().getRegionLocations(TableName.META_TABLE_NAME, HConstants.EMPTY_START_ROW,
RegionLocateType.CURRENT, false, TimeUnit.SECONDS.toNanos(1)).join();
SpanData span = waitSpan("AsyncRegionLocator.getRegionLocations");
- assertEquals(StatusCode.OK, span.getStatus().getStatusCode());
- assertEquals(TableName.META_TABLE_NAME.getNamespaceAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.NAMESPACE_KEY));
- assertEquals(TableName.META_TABLE_NAME.getNameAsString(),
- span.getAttributes().get(HBaseSemanticAttributes.TABLE_KEY));
- List<String> regionNames = span.getAttributes().get(HBaseSemanticAttributes.REGION_NAMES_KEY);
- assertEquals(3, regionNames.size());
- for (int i = 0; i < 3; i++) {
- assertEquals(locs.getRegionLocation(i).getRegion().getRegionNameAsString(),
- regionNames.get(i));
- }
+ String[] expectedRegions = Arrays.stream(locs.getRegionLocations())
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getRegionNameAsString)
+ .toArray(String[]::new);
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME),
+ hasAttributes(
+ containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
index af7ee8b..05a8ec3 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java
@@ -17,24 +17,22 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
-import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasItem;
-import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
-import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
@@ -57,6 +55,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -99,7 +98,7 @@ public class TestAsyncTableTracing {
private ClientService.Interface stub;
- private AsyncConnection conn;
+ private AsyncConnectionImpl conn;
private AsyncTable<?> table;
@@ -197,8 +196,9 @@ public class TestAsyncTableTracing {
return null;
}
}).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any());
+ final User user = UserProvider.instantiate(CONF).getCurrent();
conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test",
- UserProvider.instantiate(CONF).getCurrent()) {
+ user) {
@Override
AsyncRegionLocator getLocator() {
@@ -236,22 +236,13 @@ public class TestAsyncTableTracing {
Closeables.close(conn, true);
}
- /**
- * All {@link Span}s generated from table data access operations over {@code tableName} should
- * include these attributes.
- */
- static Matcher<SpanData> buildBaseAttributesMatcher(TableName tableName) {
- return hasAttributes(allOf(
- containsEntry("db.name", tableName.getNamespaceAsString()),
- containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
- containsEntry("db.hbase.table", tableName.getNameAsString())));
- }
-
private void assertTrace(String tableOperation) {
assertTrace(tableOperation, new IsAnything<>());
}
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
+ // n.b. this method implementation must match the one of the same name found in
+ // TestHTableTracing
final TableName tableName = table.getName();
final Matcher<SpanData> spanLocator = allOf(
hasName(containsString(tableOperation)), hasEnded());
@@ -269,7 +260,8 @@ public class TestAsyncTableTracing {
hasName(expectedName),
hasKind(SpanKind.CLIENT),
hasStatusWithCode(StatusCode.OK),
- buildBaseAttributesMatcher(tableName),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(tableName),
matcher));
}
@@ -524,16 +516,4 @@ public class TestAsyncTableTracing {
table.batchAll(Arrays.asList(new Delete(Bytes.toBytes(0)))).join();
assertTrace("BATCH");
}
-
- @Test
- public void testConnClose() throws IOException {
- conn.close();
- Waiter.waitFor(CONF, 1000,
- () -> traceRule.getSpans().stream()
- .anyMatch(span -> span.getName().equals("AsyncConnection.close") &&
- span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
- SpanData data = traceRule.getSpans().stream()
- .filter(s -> s.getName().equals("AsyncConnection.close")).findFirst().get();
- assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
- }
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
index 8e6409e..80db9a1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestHTableTracing.java
@@ -17,11 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.apache.hadoop.hbase.client.TestAsyncTableTracing.buildBaseAttributesMatcher;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
@@ -238,6 +239,8 @@ public class TestHTableTracing extends TestTracingBase {
}
private void assertTrace(String tableOperation, Matcher<SpanData> matcher) {
+ // n.b. this method implementation must match the one of the same name found in
+ // TestAsyncTableTracing
final TableName tableName = table.getName();
final Matcher<SpanData> spanLocator = allOf(
hasName(containsString(tableOperation)), hasEnded());
@@ -255,7 +258,8 @@ public class TestHTableTracing extends TestTracingBase {
hasName(expectedName),
hasKind(SpanKind.CLIENT),
hasStatusWithCode(StatusCode.OK),
- buildBaseAttributesMatcher(tableName),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(tableName),
matcher));
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java
index 8d93d2a..a0415da 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRegionLocatorTracing.java
@@ -17,9 +17,23 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntryWithStringValuesOf;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildConnectionAttributesMatcher;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.TraceTestUtil.buildTableAttributesMatcher;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
+import java.util.Arrays;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -51,33 +65,65 @@ public class TestRegionLocatorTracing extends TestTracingBase {
Closeables.close(conn, true);
}
-
@Test
public void testGetRegionLocation() throws IOException {
conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocation(HConstants.EMPTY_START_ROW);
- assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocation",
- null, TableName.META_TABLE_NAME);
+ SpanData span = waitSpan("HRegionLocator.getRegionLocation");
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME),
+ hasAttributes(
+ containsEntryWithStringValuesOf("db.hbase.regions",
+ META_REGION_LOCATION.getDefaultRegionLocation().getRegion().getRegionNameAsString()))));
}
@Test
public void testGetRegionLocations() throws IOException {
conn.getRegionLocator(TableName.META_TABLE_NAME).getRegionLocations(HConstants.EMPTY_START_ROW);
- assertTrace(HRegionLocator.class.getSimpleName(), "getRegionLocations",
- null, TableName.META_TABLE_NAME);
+ SpanData span = waitSpan("HRegionLocator.getRegionLocations");
+ // TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations.
+ String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations())
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getRegionNameAsString)
+ .toArray(String[]::new);
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME),
+ hasAttributes(
+ containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
}
@Test
public void testGetAllRegionLocations() throws IOException {
conn.getRegionLocator(TableName.META_TABLE_NAME).getAllRegionLocations();
- assertTrace(HRegionLocator.class.getSimpleName(), "getAllRegionLocations",
- null, TableName.META_TABLE_NAME);
+ SpanData span = waitSpan("HRegionLocator.getAllRegionLocations");
+ // TODO: Use a value of `META_REGION_LOCATION` that contains multiple region locations.
+ String[] expectedRegions = Arrays.stream(META_REGION_LOCATION.getRegionLocations())
+ .map(HRegionLocation::getRegion)
+ .map(RegionInfo::getRegionNameAsString)
+ .toArray(String[]::new);
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME),
+ hasAttributes(
+ containsEntryWithStringValuesOf("db.hbase.regions", containsInAnyOrder(expectedRegions)))));
}
@Test
public void testClearRegionLocationCache() throws IOException {
conn.getRegionLocator(TableName.META_TABLE_NAME).clearRegionLocationCache();
- assertTrace(HRegionLocator.class.getSimpleName(), "clearRegionLocationCache",
- null, TableName.META_TABLE_NAME);
+ SpanData span = waitSpan("HRegionLocator.clearRegionLocationCache");
+ assertThat(span, allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasKind(SpanKind.CLIENT),
+ buildConnectionAttributesMatcher(conn),
+ buildTableAttributesMatcher(TableName.META_TABLE_NAME)));
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java
index a172733..c2067e7 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestTracingBase.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasEnded;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import io.opentelemetry.api.trace.SpanKind;
@@ -30,15 +34,20 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
+import org.hamcrest.Matcher;
import org.junit.Before;
import org.junit.ClassRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestTracingBase {
+ private static final Logger LOG = LoggerFactory.getLogger(TestTracingBase.class);
protected static final ServerName MASTER_HOST = ServerName.valueOf("localhost", 16010, 12345);
protected static final RegionLocations META_REGION_LOCATION =
@@ -86,6 +95,28 @@ public class TestTracingBase {
}
}
+ protected SpanData waitSpan(String name) {
+ return waitSpan(hasName(name));
+ }
+
+ protected SpanData waitSpan(Matcher<SpanData> matcher) {
+ Matcher<SpanData> spanLocator = allOf(matcher, hasEnded());
+ try {
+ Waiter.waitFor(conf, 1000, new MatcherPredicate<>(
+ "waiting for span",
+ () -> TRACE_RULE.getSpans(), hasItem(spanLocator)));
+ } catch (AssertionError e) {
+ LOG.error("AssertionError while waiting for matching span. Span reservoir contains: {}",
+ TRACE_RULE.getSpans());
+ throw e;
+ }
+ return TRACE_RULE.getSpans()
+ .stream()
+ .filter(spanLocator::matches)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
+ }
+
static class RegistryForTracingTest implements ConnectionRegistry {
public RegistryForTracingTest(Configuration conf) {
@@ -106,6 +137,10 @@ public class TestTracingBase {
return CompletableFuture.completedFuture(MASTER_HOST);
}
+ @Override public String getConnectionString() {
+ return "nothing";
+ }
+
@Override public void close() {
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
index c3bf3be..c7bb205 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/AttributesMatchers.java
@@ -22,6 +22,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasProperty;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.Attributes;
+import java.util.Arrays;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.TypeSafeMatcher;
@@ -48,6 +49,17 @@ public final class AttributesMatchers {
return containsEntry(AttributeKey.stringKey(key), value);
}
+ public static Matcher<Attributes> containsEntryWithStringValuesOf(String key, String... values) {
+ return containsEntry(AttributeKey.stringArrayKey(key), Arrays.asList(values));
+ }
+
+ public static Matcher<Attributes> containsEntryWithStringValuesOf(
+ String key,
+ Matcher<Iterable<? extends String>> matcher
+ ) {
+ return new IsAttributesContaining<>(equalTo(AttributeKey.stringArrayKey(key)), matcher);
+ }
+
private static final class IsAttributesContaining<T> extends TypeSafeMatcher<Attributes> {
private final Matcher<AttributeKey<? super T>> keyMatcher;
private final Matcher<? super T> valueMatcher;
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java
new file mode 100644
index 0000000..21d37e8
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/TraceTestUtil.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client.trace.hamcrest;
+
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
+import static org.hamcrest.Matchers.allOf;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
+import org.apache.hadoop.hbase.client.ConnectionImplementation;
+import org.hamcrest.Matcher;
+
+public final class TraceTestUtil {
+
+ private TraceTestUtil() { }
+
+ /**
+ * All {@link Span}s involving {@code conn} should include these attributes.
+ */
+ public static Matcher<SpanData> buildConnectionAttributesMatcher(AsyncConnectionImpl conn) {
+ return hasAttributes(allOf(
+ containsEntry("db.system", "hbase"),
+ containsEntry("db.connection_string", "nothing"),
+ containsEntry("db.user", conn.getUser().toString())));
+ }
+
+ /**
+ * All {@link Span}s involving {@code conn} should include these attributes.
+ * @see #buildConnectionAttributesMatcher(AsyncConnectionImpl)
+ */
+ public static Matcher<SpanData> buildConnectionAttributesMatcher(ConnectionImplementation conn) {
+ return hasAttributes(allOf(
+ containsEntry("db.system", "hbase"),
+ containsEntry("db.connection_string", "nothing"),
+ containsEntry("db.user", conn.getUser().toString())));
+ }
+
+ /**
+ * All {@link Span}s involving {@code tableName} should include these attributes.
+ */
+ public static Matcher<SpanData> buildTableAttributesMatcher(TableName tableName) {
+ return hasAttributes(allOf(
+ containsEntry("db.name", tableName.getNamespaceAsString()),
+ containsEntry("db.hbase.namespace", tableName.getNamespaceAsString()),
+ containsEntry("db.hbase.table", tableName.getNameAsString())));
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
index 90c3c85..59c372f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
@@ -28,6 +28,11 @@ import org.apache.yetus.audience.InterfaceAudience;
*/
@InterfaceAudience.Private
public final class HBaseSemanticAttributes {
+ public static final AttributeKey<String> DB_SYSTEM = SemanticAttributes.DB_SYSTEM;
+ public static final String DB_SYSTEM_VALUE = SemanticAttributes.DbSystemValues.HBASE;
+ public static final AttributeKey<String> DB_CONNECTION_STRING =
+ SemanticAttributes.DB_CONNECTION_STRING;
+ public static final AttributeKey<String> DB_USER = SemanticAttributes.DB_USER;
public static final AttributeKey<String> DB_NAME = SemanticAttributes.DB_NAME;
public static final AttributeKey<String> NAMESPACE_KEY = SemanticAttributes.DB_HBASE_NAMESPACE;
public static final AttributeKey<String> DB_OPERATION = SemanticAttributes.DB_OPERATION;
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index 1c428ae..a01962e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.trace;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NAMESPACE_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.TABLE_KEY;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
@@ -30,7 +28,6 @@ import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Version;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;
@@ -53,15 +50,6 @@ public final class TraceUtil {
}
/**
- * Create a {@link SpanKind#INTERNAL} span and set table related attributes.
- */
- public static Span createTableSpan(String spanName, TableName tableName) {
- return createSpan(spanName)
- .setAttribute(NAMESPACE_KEY, tableName.getNamespaceAsString())
- .setAttribute(TABLE_KEY, tableName.getNameAsString());
- }
-
- /**
* Create a span with the given {@code kind}. Notice that, OpenTelemetry only expects one
* {@link SpanKind#CLIENT} span and one {@link SpanKind#SERVER} span for a traced request, so use
* this with caution when you want to create spans with kind other than {@link SpanKind#INTERNAL}.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java
index cdfbb6d..93eb7e8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java
@@ -75,6 +75,11 @@ public class RegionServerRegistry implements ConnectionRegistry {
}
@Override
+ public String getConnectionString() {
+ return "short-circuit";
+ }
+
+ @Override
public void close() {
// nothing
}