You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ta...@apache.org on 2021/08/05 02:59:40 UTC
[hbase] 02/04: HBASE-26125 Backport HBASE-25401 "Add trace support
for async call in rpc client" to branch-2 (#3543)
This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-25853
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 82d4765dfb45271acabc91d8b54890b3fe182d42
Author: Tak Lon (Stephen) Wu <ta...@apache.org>
AuthorDate: Mon Aug 2 11:33:19 2021 -0700
HBASE-26125 Backport HBASE-25401 "Add trace support for async call in rpc client" to branch-2 (#3543)
2/17 commits of HBASE-22120
Co-authored-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 75 ++++---
.../hadoop/hbase/ipc/BlockingRpcConnection.java | 21 +-
.../java/org/apache/hadoop/hbase/ipc/Call.java | 2 +-
.../java/org/apache/hadoop/hbase/ipc/IPCUtil.java | 12 +-
.../hadoop/hbase/ipc/NettyRpcDuplexHandler.java | 8 +-
.../org/apache/hadoop/hbase/trace/TraceUtil.java | 4 +-
.../src/main/protobuf/Tracing.proto | 14 +-
hbase-server/pom.xml | 10 +
.../org/apache/hadoop/hbase/ipc/CallRunner.java | 19 +-
.../org/apache/hadoop/hbase/ipc/ServerCall.java | 14 ++
.../hadoop/hbase/ipc/ServerRpcConnection.java | 222 ++++++++++++---------
.../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 59 +++++-
.../org/apache/hadoop/hbase/ipc/TestNettyIPC.java | 1 +
pom.xml | 12 +-
14 files changed, 299 insertions(+), 174 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 4bbb729..9117fef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.IPCUtil.toIOE;
import static org.apache.hadoop.hbase.ipc.IPCUtil.wrapException;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.net.SocketAddress;
import java.util.Collection;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.PoolMap;
import org.apache.hadoop.hbase.util.Threads;
@@ -365,7 +369,7 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
protected abstract T createConnection(ConnectionId remoteId) throws IOException;
private void onCallFinished(Call call, HBaseRpcController hrc, Address addr,
- RpcCallback<Message> callback) {
+ RpcCallback<Message> callback) {
call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime());
if (metrics != null) {
metrics.updateRpc(call.md, call.param, call.callStats);
@@ -388,44 +392,59 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
}
}
- Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
+ private Call callMethod(final Descriptors.MethodDescriptor md, final HBaseRpcController hrc,
final Message param, Message returnType, final User ticket,
final Address addr, final RpcCallback<Message> callback) {
- final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
- cs.setStartTime(EnvironmentEdgeManager.currentTime());
-
- if (param instanceof ClientProtos.MultiRequest) {
- ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
- int numActions = 0;
- for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
- numActions += regionAction.getActionCount();
- }
+ Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClient.callMethod." + md.getFullName())
+ .startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+
+ if (param instanceof ClientProtos.MultiRequest) {
+ ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
+ int numActions = 0;
+ for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
+ numActions += regionAction.getActionCount();
+ }
- cs.setNumActionsPerServer(numActions);
- }
+ cs.setNumActionsPerServer(numActions);
+ }
- final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
- Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
+ final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
+ Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
@Override
public void run(Call call) {
- counter.decrementAndGet();
- onCallFinished(call, hrc, addr, callback);
+ try (Scope scope = call.span.makeCurrent()) {
+ counter.decrementAndGet();
+ onCallFinished(call, hrc, addr, callback);
+ } finally {
+ if (hrc.failed()) {
+ span.setStatus(StatusCode.ERROR);
+ span.recordException(hrc.getFailed());
+ } else {
+ span.setStatus(StatusCode.OK);
+ }
+ span.end();
+ }
}
}, cs);
- ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
- int count = counter.incrementAndGet();
- try {
- if (count > maxConcurrentCallsPerServer) {
- throw new ServerTooBusyException(addr, count);
+ ConnectionId remoteId = new ConnectionId(ticket, md.getService().getName(), addr);
+ int count = counter.incrementAndGet();
+ try {
+ if (count > maxConcurrentCallsPerServer) {
+ throw new ServerTooBusyException(addr, count);
+ }
+ cs.setConcurrentCallsPerServer(count);
+ T connection = getConnection(remoteId);
+ connection.sendRequest(call, hrc);
+ } catch (Exception e) {
+ call.setException(toIOE(e));
+ span.end();
}
- cs.setConcurrentCallsPerServer(count);
- T connection = getConnection(remoteId);
- connection.sendRequest(call, hrc);
- } catch (Exception e) {
- call.setException(toIOE(e));
+ return call;
}
- return call;
}
private static Address createAddr(ServerName sn) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
index 1a5cb73..eb8e1d9 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java
@@ -24,8 +24,6 @@ import static org.apache.hadoop.hbase.ipc.IPCUtil.isFatalConnectionException;
import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled;
import static org.apache.hadoop.hbase.ipc.IPCUtil.write;
-import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -57,7 +55,6 @@ import org.apache.hadoop.hbase.security.HBaseSaslRpcClient;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.provider.SaslClientAuthenticationProvider;
-import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
import org.apache.hadoop.io.IOUtils;
@@ -192,8 +189,8 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (call.isDone()) {
continue;
}
- try {
- tracedWriteRequest(call);
+ try (Scope scope = call.span.makeCurrent()) {
+ writeRequest(call);
} catch (IOException e) {
// exception here means the call has not been added to the pendingCalls yet, so we need
// to fail it by our own.
@@ -594,16 +591,6 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
this.out = new DataOutputStream(new BufferedOutputStream(saslRpcClient.getOutputStream()));
}
- private void tracedWriteRequest(Call call) throws IOException {
- Span span = TraceUtil.getGlobalTracer().spanBuilder("RpcClientImpl.tracedWriteRequest")
- .setParent(Context.current().with(call.span)).startSpan();
- try (Scope scope = span.makeCurrent()) {
- writeRequest(call);
- } finally {
- span.end();
- }
- }
-
/**
* Initiates a call by sending the parameter to the remote server. Note: this is not called from
* the Connection thread, but by other threads.
@@ -811,7 +798,9 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
if (callSender != null) {
callSender.sendCall(call);
} else {
- tracedWriteRequest(call);
+ // this is in the same thread with the caller so do not need to attach the trace context
+ // again.
+ writeRequest(call);
}
}
});
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index 113f731..8d23d92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -61,7 +61,7 @@ class Call {
final Span span;
Timeout timeoutTask;
- protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
+ Call(int id, final Descriptors.MethodDescriptor md, Message param,
final CellScanner cells, final Message responseDefaultType, int timeout, int priority,
RpcCallback<Call> callback, MetricsConnection.CallStats callStats) {
this.param = param;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
index 42ad33a..fd42214 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.context.Context;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
@@ -49,6 +51,7 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.FastThreadLocal;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
/**
* Utility to help ipc'ing.
@@ -112,11 +115,10 @@ class IPCUtil {
static RequestHeader buildRequestHeader(Call call, CellBlockMeta cellBlockMeta) {
RequestHeader.Builder builder = RequestHeader.newBuilder();
builder.setCallId(call.id);
- //TODO handle htrace API change, see HBASE-18895
- /*if (call.span != null) {
- builder.setTraceInfo(RPCTInfo.newBuilder().setParentId(call.span.getSpanId())
- .setTraceId(call.span.getTracerId()));
- }*/
+ RPCTInfo.Builder traceBuilder = RPCTInfo.newBuilder();
+ GlobalOpenTelemetry.getPropagators().getTextMapPropagator().inject(Context.current(),
+ traceBuilder, (carrier, key, value) -> carrier.putHeaders(key, value));
+ builder.setTraceInfo(traceBuilder.build());
builder.setMethodName(call.md.getName());
builder.setRequestParam(call.param != null);
if (cellBlockMeta != null) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
index f31e3d2..c67d96f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.ipc;
+import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
@@ -114,9 +115,12 @@ class NettyRpcDuplexHandler extends ChannelDuplexHandler {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
- throws Exception {
+ throws Exception {
if (msg instanceof Call) {
- writeRequest(ctx, (Call) msg, promise);
+ Call call = (Call) msg;
+ try (Scope scope = call.span.makeCurrent()) {
+ writeRequest(ctx, call, promise);
+ }
} else {
ctx.write(msg, promise);
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index f7a111f..768de9c 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hbase.trace;
-import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Tracer;
import org.apache.yetus.audience.InterfaceAudience;
@@ -30,6 +30,6 @@ public final class TraceUtil {
}
public static Tracer getGlobalTracer() {
- return OpenTelemetry.getGlobalTracer(INSTRUMENTATION_NAME);
+ return GlobalOpenTelemetry.getTracer(INSTRUMENTATION_NAME);
}
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Tracing.proto b/hbase-protocol-shaded/src/main/protobuf/Tracing.proto
index 64ead84..276a0a7 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Tracing.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Tracing.proto
@@ -23,12 +23,12 @@ option java_outer_classname = "TracingProtos";
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;
-//Used to pass through the information necessary to continue
-//a trace after an RPC is made. All we need is the traceid
-//(so we know the overarching trace this message is a part of), and
-//the id of the current span when this message was sent, so we know
-//what span caused the new span we will create when this message is received.
+// OpenTelemetry propagates trace context through https://www.w3.org/TR/trace-context/, which
+// is a text-based approach that passes properties with http headers. Here we will also use this
+// approach so we just need a map to store the key value pair.
+
message RPCTInfo {
- optional int64 trace_id = 1;
- optional int64 parent_id = 2;
+ optional int64 trace_id = 1 [deprecated = true];
+ optional int64 parent_id = 2 [deprecated = true];
+ map<string, string> headers = 3;
}
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index a9a7b72..ef2fad3 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -441,6 +441,16 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index 3ae089e..203f079 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.ipc;
import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
@@ -73,15 +75,6 @@ public class CallRunner {
return call;
}
- /**
- * Keep for backward compatibility.
- * @deprecated As of release 2.0, this will be removed in HBase 3.0
- */
- @Deprecated
- public ServerCall<?> getCall() {
- return (ServerCall<?>) call;
- }
-
public void setStatus(MonitoredRPCHandler status) {
this.status = status;
}
@@ -130,7 +123,8 @@ public class CallRunner {
String serviceName = getServiceName();
String methodName = getMethodName();
String traceString = serviceName + "." + methodName;
- Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString).startSpan();
+ Span span = TraceUtil.getGlobalTracer().spanBuilder(traceString)
+ .setParent(Context.current().with(((ServerCall<?>) call).getSpan())).startSpan();
try (Scope traceScope = span.makeCurrent()) {
if (!this.rpcServer.isStarted()) {
InetSocketAddress address = rpcServer.getListenerAddress();
@@ -141,8 +135,12 @@ public class CallRunner {
resultPair = this.rpcServer.call(call, this.status);
} catch (TimeoutIOException e){
RpcServer.LOG.warn("Can not complete this request in time, drop it: " + call);
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR);
return;
} catch (Throwable e) {
+ span.recordException(e);
+ span.setStatus(StatusCode.ERROR);
if (e instanceof ServerNotRunningYetException) {
// If ServerNotRunningYetException, don't spew stack trace.
if (RpcServer.LOG.isTraceEnabled()) {
@@ -161,6 +159,7 @@ public class CallRunner {
RpcServer.CurCall.set(null);
if (resultPair != null) {
this.rpcServer.addCallSize(call.getSize() * -1);
+ span.setStatus(StatusCode.OK);
sucessful = true;
}
span.end();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
index ff4a521..4a021ce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerCall.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.ipc;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.api.trace.StatusCode;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
@@ -102,6 +104,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// from WAL side on release
private final AtomicInteger reference = new AtomicInteger(0x80000000);
+ private final Span span;
+
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NP_NULL_ON_SOME_PATH",
justification = "Can't figure why this complaint is happening... see below")
ServerCall(int id, BlockingService service, MethodDescriptor md, RequestHeader header,
@@ -132,6 +136,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
this.bbAllocator = byteBuffAllocator;
this.cellBlockBuilder = cellBlockBuilder;
this.reqCleanup = reqCleanup;
+ this.span = Span.current();
}
/**
@@ -150,6 +155,7 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
// If the call was run successfuly, we might have already returned the BB
// back to pool. No worries..Then inputCellBlock will be null
cleanup();
+ span.end();
}
@Override
@@ -226,6 +232,10 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
}
if (t != null) {
this.isError = true;
+ span.recordException(t);
+ span.setStatus(StatusCode.ERROR);
+ } else {
+ span.setStatus(StatusCode.OK);
}
BufferChain bc = null;
try {
@@ -560,4 +570,8 @@ public abstract class ServerCall<T extends ServerRpcConnection> implements RpcCa
return response;
}
}
+
+ public Span getSpan() {
+ return span;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index 29ce30b..db7f052 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -19,6 +19,11 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.HConstants.RPC_HEADER;
+import io.opentelemetry.api.GlobalOpenTelemetry;
+import io.opentelemetry.api.trace.Span;
+import io.opentelemetry.context.Context;
+import io.opentelemetry.context.Scope;
+import io.opentelemetry.context.propagation.TextMapPropagator;
import java.io.ByteArrayInputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
@@ -31,13 +36,11 @@ import java.nio.channels.ReadableByteChannel;
import java.security.GeneralSecurityException;
import java.util.Objects;
import java.util.Properties;
-
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.VersionInfoUtil;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
@@ -53,6 +56,20 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProvider;
import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders;
import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableUtils;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.ProxyUsers;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteInput;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
@@ -61,6 +78,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescrip
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
@@ -68,17 +86,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHea
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo;
/** Reads calls from a connection and queues them for handling. */
@edu.umd.cs.findbugs.annotations.SuppressWarnings(
@@ -607,99 +615,115 @@ abstract class ServerRpcConnection implements Closeable {
ProtobufUtil.mergeFrom(builder, cis, headerSize);
RequestHeader header = (RequestHeader) builder.build();
offset += headerSize;
- int id = header.getCallId();
- if (RpcServer.LOG.isTraceEnabled()) {
- RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header)
- + " totalRequestSize: " + totalRequestSize + " bytes");
- }
- // Enforcing the call queue size, this triggers a retry in the client
- // This is a bit late to be doing this check - we have already read in the
- // total request.
- if ((totalRequestSize +
- this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
- final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
- totalRequestSize, null, 0, this.callCleanup);
- this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
- callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + this.rpcServer.server.getServerName() +
- ", is hbase.ipc.server.max.callqueue.size too small?");
- callTooBig.sendResponseIfReady();
- return;
- }
- MethodDescriptor md = null;
- Message param = null;
- CellScanner cellScanner = null;
- try {
- if (header.hasRequestParam() && header.getRequestParam()) {
- md = this.service.getDescriptorForType().findMethodByName(
- header.getMethodName());
- if (md == null)
- throw new UnsupportedOperationException(header.getMethodName());
- builder = this.service.getRequestPrototype(md).newBuilderForType();
- cis.resetSizeCounter();
- int paramSize = cis.readRawVarint32();
- offset += cis.getTotalBytesRead();
- if (builder != null) {
- ProtobufUtil.mergeFrom(builder, cis, paramSize);
- param = builder.build();
- }
- offset += paramSize;
- } else {
- // currently header must have request param, so we directly throw
- // exception here
- String msg = "Invalid request header: "
- + TextFormat.shortDebugString(header)
- + ", should have param set in it";
- RpcServer.LOG.warn(msg);
- throw new DoNotRetryIOException(msg);
+ TextMapPropagator.Getter<RPCTInfo> getter = new TextMapPropagator.Getter<RPCTInfo>() {
+
+ @Override
+ public Iterable<String> keys(RPCTInfo carrier) {
+ return carrier.getHeadersMap().keySet();
}
- if (header.hasCellBlockMeta()) {
- buf.position(offset);
- ByteBuff dup = buf.duplicate();
- dup.limit(offset + header.getCellBlockMeta().getLength());
- cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(
- this.codec, this.compressionCodec, dup);
+
+ @Override
+ public String get(RPCTInfo carrier, String key) {
+ return carrier.getHeadersMap().get(key);
}
- } catch (Throwable t) {
- InetSocketAddress address = this.rpcServer.getListenerAddress();
- String msg = (address != null ? address : "(channel closed)")
- + " is unable to read call parameter from client "
- + getHostAddress();
- RpcServer.LOG.warn(msg, t);
-
- this.rpcServer.metrics.exception(t);
-
- // probably the hbase hadoop version does not match the running hadoop
- // version
- if (t instanceof LinkageError) {
- t = new DoNotRetryIOException(t);
+ };
+ Context traceCtx = GlobalOpenTelemetry.getPropagators().getTextMapPropagator()
+ .extract(Context.current(), header.getTraceInfo(), getter);
+ Span span =
+ TraceUtil.getGlobalTracer().spanBuilder("RpcServer.process").setParent(traceCtx).startSpan();
+ try (Scope scope = span.makeCurrent()) {
+ int id = header.getCallId();
+ if (RpcServer.LOG.isTraceEnabled()) {
+ RpcServer.LOG.trace("RequestHeader " + TextFormat.shortDebugString(header) +
+ " totalRequestSize: " + totalRequestSize + " bytes");
}
- // If the method is not present on the server, do not retry.
- if (t instanceof UnsupportedOperationException) {
- t = new DoNotRetryIOException(t);
+ // Enforcing the call queue size, this triggers a retry in the client
+ // This is a bit late to be doing this check - we have already read in the
+ // total request.
+ if ((totalRequestSize +
+ this.rpcServer.callQueueSizeInBytes.sum()) > this.rpcServer.maxQueueSizeInBytes) {
+ final ServerCall<?> callTooBig = createCall(id, this.service, null, null, null, null,
+ totalRequestSize, null, 0, this.callCleanup);
+ this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+ callTooBig.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+ "Call queue is full on " + this.rpcServer.server.getServerName() +
+ ", is hbase.ipc.server.max.callqueue.size too small?");
+ callTooBig.sendResponseIfReady();
+ return;
}
+ MethodDescriptor md = null;
+ Message param = null;
+ CellScanner cellScanner = null;
+ try {
+ if (header.hasRequestParam() && header.getRequestParam()) {
+ md = this.service.getDescriptorForType().findMethodByName(header.getMethodName());
+ if (md == null) {
+ throw new UnsupportedOperationException(header.getMethodName());
+ }
+ builder = this.service.getRequestPrototype(md).newBuilderForType();
+ cis.resetSizeCounter();
+ int paramSize = cis.readRawVarint32();
+ offset += cis.getTotalBytesRead();
+ if (builder != null) {
+ ProtobufUtil.mergeFrom(builder, cis, paramSize);
+ param = builder.build();
+ }
+ offset += paramSize;
+ } else {
+ // currently header must have request param, so we directly throw
+ // exception here
+ String msg = "Invalid request header: " + TextFormat.shortDebugString(header) +
+ ", should have param set in it";
+ RpcServer.LOG.warn(msg);
+ throw new DoNotRetryIOException(msg);
+ }
+ if (header.hasCellBlockMeta()) {
+ buf.position(offset);
+ ByteBuff dup = buf.duplicate();
+ dup.limit(offset + header.getCellBlockMeta().getLength());
+ cellScanner = this.rpcServer.cellBlockBuilder.createCellScannerReusingBuffers(this.codec,
+ this.compressionCodec, dup);
+ }
+ } catch (Throwable t) {
+ InetSocketAddress address = this.rpcServer.getListenerAddress();
+ String msg = (address != null ? address : "(channel closed)") +
+ " is unable to read call parameter from client " + getHostAddress();
+ RpcServer.LOG.warn(msg, t);
+
+ this.rpcServer.metrics.exception(t);
+
+ // probably the hbase hadoop version does not match the running hadoop
+ // version
+ if (t instanceof LinkageError) {
+ t = new DoNotRetryIOException(t);
+ }
+ // If the method is not present on the server, do not retry.
+ if (t instanceof UnsupportedOperationException) {
+ t = new DoNotRetryIOException(t);
+ }
- ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
- totalRequestSize, null, 0, this.callCleanup);
- readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
- readParamsFailedCall.sendResponseIfReady();
- return;
- }
-
- int timeout = 0;
- if (header.hasTimeout() && header.getTimeout() > 0) {
- timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
- }
- ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner, totalRequestSize,
- this.addr, timeout, this.callCleanup);
+ ServerCall<?> readParamsFailedCall = createCall(id, this.service, null, null, null, null,
+ totalRequestSize, null, 0, this.callCleanup);
+ readParamsFailedCall.setResponse(null, null, t, msg + "; " + t.getMessage());
+ readParamsFailedCall.sendResponseIfReady();
+ return;
+ }
- if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
- this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
- this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
- call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
- "Call queue is full on " + this.rpcServer.server.getServerName() +
+ int timeout = 0;
+ if (header.hasTimeout() && header.getTimeout() > 0) {
+ timeout = Math.max(this.rpcServer.minClientRequestTimeout, header.getTimeout());
+ }
+ ServerCall<?> call = createCall(id, this.service, md, header, param, cellScanner,
+ totalRequestSize, this.addr, timeout, this.callCleanup);
+
+ if (!this.rpcServer.scheduler.dispatch(new CallRunner(this.rpcServer, call))) {
+ this.rpcServer.callQueueSizeInBytes.add(-1 * call.getSize());
+ this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
+ call.setResponse(null, null, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+ "Call queue is full on " + this.rpcServer.server.getServerName() +
", too many items queued ?");
- call.sendResponseIfReady();
+ call.sendResponseIfReady();
+ }
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 87561ba..11978ca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -20,21 +20,28 @@ package org.apache.hadoop.hbase.ipc;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.SERVICE;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newBlockingStub;
import static org.apache.hadoop.hbase.ipc.TestProtobufRpcServiceImpl.newStub;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.anyObject;
+import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -43,10 +50,12 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.StringUtils;
+import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -87,6 +96,10 @@ public abstract class AbstractTestIPC {
protected abstract AbstractRpcClient<?> createRpcClientNoCodec(Configuration conf);
+
+ @Rule
+ public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+
/**
* Ensure we do not HAVE TO HAVE a codec.
*/
@@ -183,7 +196,7 @@ public abstract class AbstractTestIPC {
RpcServer rpcServer = createRpcServer(null, "testRpcServer",
Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(
SERVICE, null)), new InetSocketAddress("localhost", 0), CONF, scheduler);
- verify(scheduler).init((RpcScheduler.Context) anyObject());
+ verify(scheduler).init(any(RpcScheduler.Context.class));
try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
rpcServer.start();
verify(scheduler).start();
@@ -192,7 +205,7 @@ public abstract class AbstractTestIPC {
for (int i = 0; i < 10; i++) {
stub.echo(null, param);
}
- verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
+ verify(scheduler, times(10)).dispatch(any(CallRunner.class));
} finally {
rpcServer.stop();
verify(scheduler).stop();
@@ -427,4 +440,44 @@ public abstract class AbstractTestIPC {
}
}
+ private void assertSameTraceId() {
+ String traceId = traceRule.getSpans().get(0).getTraceId();
+ for (SpanData data : traceRule.getSpans()) {
+ // assert we are the same trace
+ assertEquals(traceId, data.getTraceId());
+ }
+ }
+
+ @Test
+ public void testTracing() throws IOException, ServiceException {
+ RpcServer rpcServer = createRpcServer(null, "testRpcServer",
+ Lists.newArrayList(new RpcServer.BlockingServiceAndInterface(SERVICE, null)),
+ new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1));
+ try (AbstractRpcClient<?> client = createRpcClient(CONF)) {
+ rpcServer.start();
+ BlockingInterface stub = newBlockingStub(client, rpcServer.getListenerAddress());
+ stub.pause(null, PauseRequestProto.newBuilder().setMs(100).build());
+ Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
+ .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.pause")));
+
+ assertSameTraceId();
+ for (SpanData data : traceRule.getSpans()) {
+ assertThat(
+ TimeUnit.NANOSECONDS.toMillis(data.getEndEpochNanos() - data.getStartEpochNanos()),
+ greaterThanOrEqualTo(100L));
+ assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
+ }
+
+ traceRule.clearSpans();
+ assertThrows(ServiceException.class,
+ () -> stub.error(null, EmptyRequestProto.getDefaultInstance()));
+ Waiter.waitFor(CONF, 1000, () -> traceRule.getSpans().stream().map(SpanData::getName)
+ .anyMatch(s -> s.equals("RpcClient.callMethod.TestProtobufRpcProto.error")));
+
+ assertSameTraceId();
+ for (SpanData data : traceRule.getSpans()) {
+ assertEquals(StatusCode.ERROR, data.getStatus().getStatusCode());
+ }
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
index 2601fba..c3b52a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java
@@ -39,6 +39,7 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
+
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.hbase.thirdparty.io.netty.channel.epoll.EpollSocketChannel;
diff --git a/pom.xml b/pom.xml
index 7803e88..77d12ac 100755
--- a/pom.xml
+++ b/pom.xml
@@ -1482,7 +1482,7 @@
<jruby.version>9.2.13.0</jruby.version>
<junit.version>4.13</junit.version>
<hamcrest.version>1.3</hamcrest.version>
- <opentelemetry.version>0.12.0</opentelemetry.version>
+ <opentelemetry.version>0.13.1</opentelemetry.version>
<log4j.version>1.2.17</log4j.version>
<mockito-core.version>2.28.2</mockito-core.version>
<!--Internally we use a different version of protobuf. See hbase-protocol-shaded-->
@@ -2179,6 +2179,16 @@
<version>${opentelemetry.version}</version>
</dependency>
<dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ <version>${opentelemetry.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>${disruptor.version}</version>