You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2022/02/10 12:00:15 UTC
[hbase] branch branch-2.5 updated: HBASE-26521 Name RPC spans as `$package.$service/$method` (#4024)
This is an automated email from the ASF dual-hosted git repository.
ndimiduk pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 600a6a8 HBASE-26521 Name RPC spans as `$package.$service/$method` (#4024)
600a6a8 is described below
commit 600a6a8faf18354c41224176ee0c2d44645545e3
Author: Nick Dimiduk <nd...@apache.org>
AuthorDate: Wed Feb 9 15:22:31 2022 +0100
HBASE-26521 Name RPC spans as `$package.$service/$method` (#4024)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hbase/client/trace/ConnectionSpanBuilder.java | 24 +++-
...nSpanBuilder.java => IpcClientSpanBuilder.java} | 88 +++++++-----
.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 14 +-
.../client/trace/hamcrest/SpanDataMatchers.java | 24 ++++
.../hbase/trace/HBaseSemanticAttributes.java | 19 ++-
hbase-server/pom.xml | 6 +
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 19 +--
.../hbase/server/trace/IpcServerSpanBuilder.java | 92 ++++++++++++
.../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 159 ++++++++++++++-------
pom.xml | 7 +
10 files changed, 329 insertions(+), 123 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
index 93a9d8f..27081a7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
@@ -32,6 +32,7 @@ import java.util.Optional;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
@@ -91,15 +92,32 @@ public class ConnectionSpanBuilder implements Supplier<Span> {
* Static utility method that performs the primary logic of this builder. It is visible to other
* classes in this package so that other builders can use this functionality as a mix-in.
* @param attributes the attributes map to be populated.
- * @param conn the source of attribute values.
+ * @param conn the source of connection attribute values.
*/
static void populateConnectionAttributes(
final Map<AttributeKey<?>, Object> attributes,
final AsyncConnectionImpl conn
) {
+ final Supplier<String> connStringSupplier = () -> conn.getConnectionRegistry()
+ .getConnectionString();
+ populateConnectionAttributes(attributes, connStringSupplier, conn::getUser);
+ }
+
+ /**
+ * Static utility method that performs the primary logic of this builder. It is visible to other
+ * classes in this package so that other builders can use this functionality as a mix-in.
+ * @param attributes the attributes map to be populated.
+ * @param connectionStringSupplier the source of the {@code db.connection_string} attribute value.
+ * @param userSupplier the source of the {@code db.user} attribute value.
+ */
+ static void populateConnectionAttributes(
+ final Map<AttributeKey<?>, Object> attributes,
+ final Supplier<String> connectionStringSupplier,
+ final Supplier<User> userSupplier
+ ) {
attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
- attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
- attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
+ attributes.put(DB_CONNECTION_STRING, connectionStringSupplier.get());
+ attributes.put(DB_USER, Optional.ofNullable(userSupplier.get())
.map(Object::toString)
.orElse(null));
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java
similarity index 51%
copy from hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
copy to hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java
index 93a9d8f..07edbcb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/ConnectionSpanBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/trace/IpcClientSpanBuilder.java
@@ -18,48 +18,51 @@
package org.apache.hadoop.hbase.client.trace;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_CONNECTION_STRING;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_SYSTEM_VALUE;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.DB_USER;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_NAME;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.NET_PEER_PORT;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.SpanKind;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.function.Supplier;
-import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
-import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.net.Address;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
/**
- * Construct {@link Span} instances originating from the client side of a connection.
+ * Construct {@link Span} instances originating from the client side of an IPC.
+ *
+ * @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
*/
@InterfaceAudience.Private
-public class ConnectionSpanBuilder implements Supplier<Span> {
+public class IpcClientSpanBuilder implements Supplier<Span> {
private String name;
private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
- public ConnectionSpanBuilder(final AsyncConnectionImpl conn) {
- populateConnectionAttributes(attributes, conn);
- }
-
@Override
public Span get() {
return build();
}
- public ConnectionSpanBuilder setName(final String name) {
- this.name = name;
+ public IpcClientSpanBuilder setMethodDescriptor(final Descriptors.MethodDescriptor md) {
+ final String packageAndService = getRpcPackageAndService(md.getService());
+ final String method = getRpcName(md);
+ this.name = buildSpanName(packageAndService, method);
+ populateMethodDescriptorAttributes(attributes, md);
return this;
}
- public <T> ConnectionSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
- attributes.put(key, value);
+ public IpcClientSpanBuilder setRemoteAddress(final Address remoteAddress) {
+ attributes.put(NET_PEER_NAME, remoteAddress.getHostName());
+ attributes.put(NET_PEER_PORT, (long) remoteAddress.getPort());
return this;
}
@@ -74,33 +77,42 @@ public class ConnectionSpanBuilder implements Supplier<Span> {
}
/**
- * @see #populateConnectionAttributes(Map, AsyncConnectionImpl)
+ * Static utility method that performs the primary logic of this builder. It is visible to other
+ * classes in this package so that other builders can use this functionality as a mix-in.
+ * @param attributes the attributes map to be populated.
+ * @param md the source of the RPC attribute values.
*/
- static void populateConnectionAttributes(
+ static void populateMethodDescriptorAttributes(
final Map<AttributeKey<?>, Object> attributes,
- final ClusterConnection conn
+ final Descriptors.MethodDescriptor md
) {
- attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
- attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
- attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
- .map(Object::toString)
- .orElse(null));
+ final String packageAndService = getRpcPackageAndService(md.getService());
+ final String method = getRpcName(md);
+ attributes.put(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
+ attributes.put(RPC_SERVICE, packageAndService);
+ attributes.put(RPC_METHOD, method);
}
/**
- * Static utility method that performs the primary logic of this builder. It is visible to other
- * classes in this package so that other builders can use this functionality as a mix-in.
- * @param attributes the attributes map to be populated.
- * @param conn the source of attribute values.
+ * Retrieve the combined {@code $package.$service} value from {@code sd}.
*/
- static void populateConnectionAttributes(
- final Map<AttributeKey<?>, Object> attributes,
- final AsyncConnectionImpl conn
- ) {
- attributes.put(DB_SYSTEM, DB_SYSTEM_VALUE);
- attributes.put(DB_CONNECTION_STRING, conn.getConnectionRegistry().getConnectionString());
- attributes.put(DB_USER, Optional.ofNullable(conn.getUser())
- .map(Object::toString)
- .orElse(null));
+ public static String getRpcPackageAndService(final Descriptors.ServiceDescriptor sd) {
+ // it happens that `getFullName` returns a string in the $package.$service format required by
+ // the otel RPC specification. Use it for now; might have to parse the value in the future.
+ return sd.getFullName();
+ }
+
+ /**
+ * Retrieve the {@code $method} value from {@code md}.
+ */
+ public static String getRpcName(final Descriptors.MethodDescriptor md) {
+ return md.getName();
+ }
+
+ /**
+ * Construct an RPC span name.
+ */
+ public static String buildSpanName(final String packageAndService, final String method) {
+ return packageAndService + "/" + method;
}
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index b502142..accb832 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_HOST_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.REMOTE_PORT_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
@@ -40,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.MetricsConnection;
+import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
@@ -399,11 +396,10 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket, final Address addr,
final RpcCallback<Message> callback) {
- Span span = TraceUtil.createClientSpan("RpcClient.callMethod")
- .setAttribute(RPC_SERVICE_KEY, md.getService().getName())
- .setAttribute(RPC_METHOD_KEY, md.getName())
- .setAttribute(REMOTE_HOST_KEY, addr.getHostName())
- .setAttribute(REMOTE_PORT_KEY, addr.getPort());
+ Span span = new IpcClientSpanBuilder()
+ .setMethodDescriptor(md)
+ .setRemoteAddress(addr)
+ .build();
try (Scope scope = span.makeCurrent()) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java
index 2839e7c..9697d69 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/trace/hamcrest/SpanDataMatchers.java
@@ -24,6 +24,7 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
+import java.time.Duration;
import org.hamcrest.Description;
import org.hamcrest.FeatureMatcher;
import org.hamcrest.Matcher;
@@ -46,6 +47,16 @@ public final class SpanDataMatchers {
};
}
+ public static Matcher<SpanData> hasDuration(Matcher<Duration> matcher) {
+ return new FeatureMatcher<SpanData, Duration>(
+ matcher, "SpanData having duration that ", "duration") {
+ @Override
+ protected Duration featureValueOf(SpanData item) {
+ return Duration.ofNanos(item.getEndEpochNanos() - item.getStartEpochNanos());
+ }
+ };
+ }
+
public static Matcher<SpanData> hasEnded() {
return new TypeSafeMatcher<SpanData>() {
@Override protected boolean matchesSafely(SpanData item) {
@@ -92,4 +103,17 @@ public final class SpanDataMatchers {
}
};
}
+
+ public static Matcher<SpanData> hasTraceId(String traceId) {
+ return hasTraceId(is(equalTo(traceId)));
+ }
+
+ public static Matcher<SpanData> hasTraceId(Matcher<String> matcher) {
+ return new FeatureMatcher<SpanData, String>(
+ matcher, "SpanData with a traceId that ", "traceId") {
+ @Override protected String featureValueOf(SpanData item) {
+ return item.getTraceId();
+ }
+ };
+ }
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
index 1a74fdc..1689a44 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/HBaseSemanticAttributes.java
@@ -44,14 +44,13 @@ public final class HBaseSemanticAttributes {
AttributeKey.stringArrayKey("db.hbase.container_operations");
public static final AttributeKey<List<String>> REGION_NAMES_KEY =
AttributeKey.stringArrayKey("db.hbase.regions");
- public static final AttributeKey<String> RPC_SERVICE_KEY =
- AttributeKey.stringKey("db.hbase.rpc.service");
- public static final AttributeKey<String> RPC_METHOD_KEY =
- AttributeKey.stringKey("db.hbase.rpc.method");
+ public static final AttributeKey<String> RPC_SYSTEM = SemanticAttributes.RPC_SYSTEM;
+ public static final AttributeKey<String> RPC_SERVICE = SemanticAttributes.RPC_SERVICE;
+ public static final AttributeKey<String> RPC_METHOD = SemanticAttributes.RPC_METHOD;
public static final AttributeKey<String> SERVER_NAME_KEY =
AttributeKey.stringKey("db.hbase.server.name");
- public static final AttributeKey<String> REMOTE_HOST_KEY = SemanticAttributes.NET_PEER_NAME;
- public static final AttributeKey<Long> REMOTE_PORT_KEY = SemanticAttributes.NET_PEER_PORT;
+ public static final AttributeKey<String> NET_PEER_NAME = SemanticAttributes.NET_PEER_NAME;
+ public static final AttributeKey<Long> NET_PEER_PORT = SemanticAttributes.NET_PEER_PORT;
public static final AttributeKey<Boolean> ROW_LOCK_READ_LOCK_KEY =
AttributeKey.booleanKey("db.hbase.rowlock.readlock");
public static final AttributeKey<String> WAL_IMPL = AttributeKey.stringKey("db.hbase.wal.impl");
@@ -74,5 +73,13 @@ public final class HBaseSemanticAttributes {
SCAN,
}
+ /**
+ * These are values used with {@link #RPC_SYSTEM}. Only a single value for now; more to come as
+ * we add tracing over our gateway components.
+ */
+ public enum RpcSystem {
+ HBASE_RPC,
+ }
+
private HBaseSemanticAttributes() { }
}
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index 50ca081..48350c0 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -257,6 +257,12 @@
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-client</artifactId>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
<artifactId>hbase-http</artifactId>
<exclusions>
<exclusion>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index ef37247..9607a71 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -17,11 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD_KEY;
-import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE_KEY;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
-import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
@@ -32,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.server.trace.IpcServerSpanBuilder;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -90,14 +88,6 @@ public class CallRunner {
this.rpcServer = null;
}
- private String getServiceName() {
- return call.getService() != null ? call.getService().getDescriptorForType().getName() : "";
- }
-
- private String getMethodName() {
- return call.getMethod() != null ? call.getMethod().getName() : "";
- }
-
public void run() {
try {
if (call.disconnectSince() >= 0) {
@@ -122,12 +112,7 @@ public class CallRunner {
String error = null;
Pair<Message, CellScanner> resultPair = null;
RpcServer.CurCall.set(call);
- String serviceName = getServiceName();
- String methodName = getMethodName();
- Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcServer.callMethod")
- .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan()
- .setAttribute(RPC_SERVICE_KEY, serviceName)
- .setAttribute(RPC_METHOD_KEY, methodName);
+ Span span = new IpcServerSpanBuilder(call).build();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java
new file mode 100644
index 0000000..259268b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/server/trace/IpcServerSpanBuilder.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.server.trace;
+
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_METHOD;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SERVICE;
+import static org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RPC_SYSTEM;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.SpanBuilder;
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.context.Context;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+import org.apache.hadoop.hbase.client.trace.IpcClientSpanBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes.RpcSystem;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+
+/**
+ * Construct {@link Span} instances originating from the server side of an IPC.
+ *
+ * @see <a href="https://github.com/open-telemetry/opentelemetry-specification/blob/3e380e249f60c3a5f68746f5e84d10195ba41a79/specification/trace/semantic_conventions/rpc.md">Semantic conventions for RPC spans</a>
+ */
+@InterfaceAudience.Private
+public class IpcServerSpanBuilder implements Supplier<Span> {
+
+ private final RpcCall rpcCall;
+ private String name;
+ private final Map<AttributeKey<?>, Object> attributes = new HashMap<>();
+
+ public IpcServerSpanBuilder(final RpcCall rpcCall) {
+ this.rpcCall = rpcCall;
+ final String packageAndService = Optional.ofNullable(rpcCall.getService())
+ .map(BlockingService::getDescriptorForType)
+ .map(IpcClientSpanBuilder::getRpcPackageAndService)
+ .orElse("");
+ final String method = Optional.ofNullable(rpcCall.getMethod())
+ .map(IpcClientSpanBuilder::getRpcName)
+ .orElse("");
+ setName(IpcClientSpanBuilder.buildSpanName(packageAndService, method));
+ addAttribute(RPC_SYSTEM, RpcSystem.HBASE_RPC.name());
+ addAttribute(RPC_SERVICE, packageAndService);
+ addAttribute(RPC_METHOD, method);
+ }
+
+ @Override
+ public Span get() {
+ return build();
+ }
+
+ public IpcServerSpanBuilder setName(final String name) {
+ this.name = name;
+ return this;
+ }
+
+ public <T> IpcServerSpanBuilder addAttribute(final AttributeKey<T> key, T value) {
+ attributes.put(key, value);
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public Span build() {
+ final SpanBuilder builder = TraceUtil.getGlobalTracer()
+ .spanBuilder(name)
+ .setSpanKind(SpanKind.SERVER);
+ attributes.forEach((k, v) -> builder.setAttribute((AttributeKey<? super Object>) k, v));
+ return builder.setParent(Context.current().with(((ServerCall<?>) rpcCall).getSpan()))
+ .startSpan();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 030b052..1309ef7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -17,11 +17,21 @@
*/
package org.apache.hadoop.hbase.ipc;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.AttributesMatchers.containsEntry;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasAttributes;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasDuration;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasKind;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasName;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasStatusWithCode;
+import static org.apache.hadoop.hbase.client.trace.hamcrest.SpanDataMatchers.hasTraceId;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.everyItem;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@@ -33,16 +43,16 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
-
+import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -50,22 +60,21 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MatcherPredicate;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
-import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
+import org.hamcrest.Matcher;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -442,74 +451,124 @@ public abstract class AbstractTestIPC {
}
}
- private void assertSameTraceId() {
- String traceId = traceRule.getSpans().get(0).getTraceId();
- for (SpanData data : traceRule.getSpans()) {
- // assert we are the same trace
- assertEquals(traceId, data.getTraceId());
- }
+ private SpanData waitSpan(Matcher<SpanData> matcher) {
+ Waiter.waitFor(CONF, 1000, new MatcherPredicate<>(
+ () -> traceRule.getSpans(), hasItem(matcher)));
+ return traceRule.getSpans()
+ .stream()
+ .filter(matcher::matches)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
}
- private SpanData waitSpan(String name) {
- Waiter.waitFor(CONF, 1000,
- () -> traceRule.getSpans().stream().map(SpanData::getName).anyMatch(s -> s.equals(name)));
- return traceRule.getSpans().stream().filter(s -> s.getName().equals(name)).findFirst().get();
+ private static String buildIpcSpanName(final String packageAndService, final String methodName) {
+ return packageAndService + "/" + methodName;
}
- private void assertRpcAttribute(SpanData data, String methodName, InetSocketAddress addr,
- SpanKind kind) {
- assertEquals(SERVICE.getDescriptorForType().getName(),
- data.getAttributes().get(HBaseSemanticAttributes.RPC_SERVICE_KEY));
- assertEquals(methodName, data.getAttributes().get(HBaseSemanticAttributes.RPC_METHOD_KEY));
- if (addr != null) {
- assertEquals(
- addr.getHostName(),
- data.getAttributes().get(HBaseSemanticAttributes.REMOTE_HOST_KEY));
- assertEquals(
- addr.getPort(),
- data.getAttributes().get(HBaseSemanticAttributes.REMOTE_PORT_KEY).intValue());
- }
- assertEquals(kind, data.getKind());
+ private static Matcher<SpanData> buildIpcClientSpanMatcher(
+ final String packageAndService,
+ final String methodName
+ ) {
+ return allOf(
+ hasName(buildIpcSpanName(packageAndService, methodName)),
+ hasKind(SpanKind.CLIENT)
+ );
+ }
+
+ private static Matcher<SpanData> buildIpcServerSpanMatcher(
+ final String packageAndService,
+ final String methodName
+ ) {
+ return allOf(
+ hasName(buildIpcSpanName(packageAndService, methodName)),
+ hasKind(SpanKind.SERVER)
+ );
+ }
+
+ private static Matcher<SpanData> buildIpcClientSpanAttributesMatcher(
+ final String packageAndService,
+ final String methodName,
+ final InetSocketAddress isa
+ ) {
+ return hasAttributes(allOf(
+ containsEntry("rpc.system", "HBASE_RPC"),
+ containsEntry("rpc.service", packageAndService),
+ containsEntry("rpc.method", methodName),
+ containsEntry("net.peer.name", isa.getHostName()),
+ containsEntry(AttributeKey.longKey("net.peer.port"), (long) isa.getPort())));
+ }
+
+ private static Matcher<SpanData> buildIpcServerSpanAttributesMatcher(
+ final String packageAndService,
+ final String methodName
+ ) {
+ return hasAttributes(allOf(
+ containsEntry("rpc.system", "HBASE_RPC"),
+ containsEntry("rpc.service", packageAndService),
+ containsEntry("rpc.method", methodName)));
}
private void assertRemoteSpan() {
- SpanData data = waitSpan("RpcServer.process");
+ SpanData data = waitSpan(hasName("RpcServer.process"));
assertTrue(data.getParentSpanContext().isRemote());
assertEquals(SpanKind.SERVER, data.getKind());
}
@Test
- public void testTracing() throws IOException, ServiceException {
+ public void testTracingSuccessIpc() throws IOException, ServiceException {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
- new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+ new InetSocketAddress("localhost", 0), CONF,
+ new FifoRpcScheduler(CONF, 1));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
- assertRpcAttribute(waitSpan("RpcClient.callMethod"), "pause", rpcServer.getListenerAddress(),
- SpanKind.CLIENT);
- assertRpcAttribute(waitSpan("RpcServer.callMethod"), "pause", null, SpanKind.INTERNAL);
+ // use the ISA from the running server so that we can get the port selected.
+ final InetSocketAddress isa = rpcServer.getListenerAddress();
+ final SpanData pauseClientSpan = waitSpan(buildIpcClientSpanMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "pause"));
+ assertThat(pauseClientSpan, buildIpcClientSpanAttributesMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "pause", isa));
+ final SpanData pauseServerSpan = waitSpan(buildIpcServerSpanMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "pause"));
+ assertThat(pauseServerSpan, buildIpcServerSpanAttributesMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "pause"));
assertRemoteSpan();
- assertSameTraceId();
- for (SpanData data : traceRule.getSpans()) {
- assertThat(
- TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()),
- greaterThanOrEqualTo(100L));
- assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
- }
+ assertFalse("no spans provided", traceRule.getSpans().isEmpty());
+ assertThat(traceRule.getSpans(), everyItem(allOf(
+ hasStatusWithCode(StatusCode.OK),
+ hasTraceId(traceRule.getSpans().iterator().next().getTraceId()),
+ hasDuration(greaterThanOrEqualTo(Duration.ofMillis(100L))))));
+ }
+ }
- traceRule.clearSpans();
+ @Test
+ public void testTracingErrorIpc() throws IOException {
+ RpcServer rpcServer = createRpcServer(null, "testRpcServer",
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), CONF,
+ new FifoRpcScheduler(CONF, 1));
+ try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+ rpcServer.start();
+ BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+ // use the ISA from the running server so that we can get the port selected.
assertThrows(ServiceException.class,
() -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
- assertRpcAttribute(waitSpan("RpcClient.callMethod"), "error", rpcServer.getListenerAddress(),
- SpanKind.CLIENT);
- assertRpcAttribute(waitSpan("RpcServer.callMethod"), "error", null, SpanKind.INTERNAL);
+ final InetSocketAddress isa = rpcServer.getListenerAddress();
+ final SpanData errorClientSpan = waitSpan(buildIpcClientSpanMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "error"));
+ assertThat(errorClientSpan, buildIpcClientSpanAttributesMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "error", isa));
+ final SpanData errorServerSpan = waitSpan(buildIpcServerSpanMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "error"));
+ assertThat(errorServerSpan, buildIpcServerSpanAttributesMatcher(
+ "hbase.test.pb.TestProtobufRpcProto", "error"));
assertRemoteSpan();
- assertSameTraceId();
- for (SpanData data : traceRule.getSpans()) {
- assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
- }
+ assertFalse("no spans provided", traceRule.getSpans().isEmpty());
+ assertThat(traceRule.getSpans(), everyItem(allOf(
+ hasStatusWithCode(StatusCode.ERROR),
+ hasTraceId(traceRule.getSpans().iterator().next().getTraceId()))));
}
}
}
diff --git a/pom.xml b/pom.xml
index 2b908b8..ef44582 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1847,6 +1847,13 @@
<version>${project.version}</version>
</dependency>
<dependency>
+ <artifactId>hbase-client</artifactId>
+ <groupId>org.apache.hbase</groupId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<artifactId>hbase-metrics-api</artifactId>
<groupId>org.apache.hbase</groupId>
<version>${project.version}</version>