You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ta...@apache.org on 2021/08/26 18:29:59 UTC
[hbase] branch HBASE-25853 updated: HBASE-26140 Backport
HBASE-25778 "The tracinig implementation for AsyncConnectionImpl.getHbck is
incorrect" to branch-2 (#3631)
This is an automated email from the ASF dual-hosted git repository.
taklwu pushed a commit to branch HBASE-25853
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-25853 by this push:
new d807632 HBASE-26140 Backport HBASE-25778 "The tracinig implementation for AsyncConnectionImpl.getHbck is incorrect" to branch-2 (#3631)
d807632 is described below
commit d80763268621590b0701ec3fa2518fab6834d1b0
Author: Tak Lon (Stephen) Wu <ta...@apache.org>
AuthorDate: Thu Aug 26 11:29:30 2021 -0700
HBASE-26140 Backport HBASE-25778 "The tracinig implementation for AsyncConnectionImpl.getHbck is incorrect" to branch-2 (#3631)
17/17 commits of HBASE-22120, original commit f36e1539648bbaee84c626fd54d1605baebf3c5a
Co-authored-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Peter Somogyi <ps...@apache.org>
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 37 +++----
.../hbase/client/TestAsyncConnectionTracing.java | 112 +++++++++++++++++++++
.../org/apache/hadoop/hbase/trace/TraceUtil.java | 14 +++
3 files changed, 145 insertions(+), 18 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 4143173..1451b5b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -28,7 +28,6 @@ import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLE
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import io.opentelemetry.api.trace.Span;
-import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -37,6 +36,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
@@ -370,6 +370,15 @@ class AsyncConnectionImpl implements AsyncConnection {
RETRY_TIMER);
}
+ private Hbck getHbckInternal(ServerName masterServer) {
+ Span.current().setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
+ // we will not create a new connection when creating a new protobuf stub, and for hbck there
+ // will be no performance consideration, so for simplification we will create a new stub every
+ // time instead of caching the stub here.
+ return new HBaseHbck(MasterProtos.HbckService.newBlockingStub(
+ rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)), rpcControllerFactory);
+ }
+
@Override
public CompletableFuture<Hbck> getHbck() {
return TraceUtil.tracedFuture(() -> {
@@ -378,11 +387,7 @@ class AsyncConnectionImpl implements AsyncConnection {
if (error != null) {
future.completeExceptionally(error);
} else {
- try {
- future.complete(getHbck(sn));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
+ future.complete(getHbckInternal(sn));
}
});
return future;
@@ -390,18 +395,14 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
- public Hbck getHbck(ServerName masterServer) throws IOException {
- Span span = TraceUtil.createSpan("AsyncConnection.getHbck")
- .setAttribute(TraceUtil.SERVER_NAME_KEY, masterServer.getServerName());
- try (Scope scope = span.makeCurrent()) {
- // we will not create a new connection when creating a new protobuf stub, and for hbck there
- // will be no performance consideration, so for simplification we will create a new stub every
- // time instead of caching the stub here.
- return new HBaseHbck(
- MasterProtos.HbckService
- .newBlockingStub(rpcClient.createBlockingRpcChannel(masterServer, user, rpcTimeout)),
- rpcControllerFactory);
- }
+ public Hbck getHbck(ServerName masterServer) {
+ return TraceUtil.trace(new Supplier<Hbck>() {
+
+ @Override
+ public Hbck get() {
+ return getHbckInternal(masterServer);
+ }
+ }, "AsyncConnection.getHbck");
}
@Override
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java
new file mode 100644
index 0000000..fec5f6d
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import io.opentelemetry.api.trace.SpanKind;
+import io.opentelemetry.api.trace.StatusCode;
+import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule;
+import io.opentelemetry.sdk.trace.data.SpanData;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.trace.TraceUtil;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ ClientTests.class, MediumTests.class })
+public class TestAsyncConnectionTracing {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncConnectionTracing.class);
+
+ private static Configuration CONF = HBaseConfiguration.create();
+
+ private ServerName masterServer =
+ ServerName.valueOf("localhost", 12345, System.currentTimeMillis());
+
+ private AsyncConnection conn;
+
+ @Rule
+ public OpenTelemetryRule traceRule = OpenTelemetryRule.create();
+
+ @Before
+ public void setUp() throws IOException {
+ ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF) {
+
+ @Override
+ public CompletableFuture<ServerName> getActiveMaster() {
+ return CompletableFuture.completedFuture(masterServer);
+ }
+ };
+ conn = new AsyncConnectionImpl(CONF, registry, "test",
+ UserProvider.instantiate(CONF).getCurrent());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ Closeables.close(conn, true);
+ }
+
+ private void assertTrace(String methodName, ServerName serverName) {
+ Waiter.waitFor(CONF, 1000,
+ () -> traceRule.getSpans().stream()
+ .anyMatch(span -> span.getName().equals("AsyncConnection." + methodName) &&
+ span.getKind() == SpanKind.INTERNAL && span.hasEnded()));
+ SpanData data = traceRule.getSpans().stream()
+ .filter(s -> s.getName().equals("AsyncConnection." + methodName)).findFirst().get();
+ assertEquals(StatusCode.OK, data.getStatus().getStatusCode());
+ if (serverName != null) {
+ assertEquals(serverName.getServerName(), data.getAttributes().get(TraceUtil.SERVER_NAME_KEY));
+ }
+ }
+
+ @Test
+ public void testHbck() {
+ conn.getHbck().join();
+ assertTrace("getHbck", masterServer);
+ }
+
+ @Test
+ public void testHbckWithServerName() throws IOException {
+ ServerName serverName = ServerName.valueOf("localhost", 23456, System.currentTimeMillis());
+ conn.getHbck(serverName);
+ assertTrace("getHbck", serverName);
+ }
+
+ @Test
+ public void testClose() throws IOException {
+ conn.close();
+ assertTrace("close", null);
+ }
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
index 43c2e04..fb37080 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/trace/TraceUtil.java
@@ -189,6 +189,20 @@ public final class TraceUtil {
}
}
+ public static <T> T trace(Supplier<T> action, String spanName) {
+ Span span = createSpan(spanName);
+ try (Scope scope = span.makeCurrent()) {
+ T ret = action.get();
+ span.setStatus(StatusCode.OK);
+ return ret;
+ } catch (Throwable e) {
+ setError(span, e);
+ throw e;
+ } finally {
+ span.end();
+ }
+ }
+
@FunctionalInterface
public interface IOExceptionCallable<V> {
V call() throws IOException;