You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ta...@apache.org on 2020/08/29 13:37:55 UTC
[skywalking] 01/01: fix auth in sharing server.
This is an automated email from the ASF dual-hosted git repository.
tanjian pushed a commit to branch auth_fix
in repository https://gitbox.apache.org/repos/asf/skywalking.git
commit ca2adf3e12abc6e5b2cb657e61e9219f0d7306fb
Author: JaredTan95 <ji...@daocloud.io>
AuthorDate: Sat Aug 29 21:35:09 2020 +0800
fix auth in sharing server.
---
.../core/server/GRPCHandlerRegisterImpl.java | 2 +-
.../oap/server/library/server/grpc/GRPCServer.java | 33 ++++++++++++++--------
.../server/SharingServerModuleProvider.java | 15 ++++++++--
3 files changed, 34 insertions(+), 16 deletions(-)
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
index 2080701..f75f355 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
@@ -43,6 +43,6 @@ public class GRPCHandlerRegisterImpl implements GRPCHandlerRegister {
@Override
public void addFilter(ServerInterceptor interceptor) {
-
+ server.addHandler(interceptor);
}
}
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
index d2eb9ac..2c0dd6c 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
@@ -19,6 +19,7 @@
package org.apache.skywalking.oap.server.library.server.grpc;
import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
import io.grpc.ServerServiceDefinition;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
@@ -33,15 +34,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
import org.apache.skywalking.oap.server.library.server.Server;
import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+@Slf4j
public class GRPCServer implements Server {
- private static final Logger LOGGER = LoggerFactory.getLogger(GRPCServer.class);
-
private final String host;
private final int port;
private int maxConcurrentCallsPerConnection;
@@ -104,19 +103,22 @@ public class GRPCServer implements Server {
public void initialize() {
InetSocketAddress address = new InetSocketAddress(host, port);
ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize);
- ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler());
+ ExecutorService executor = new ThreadPoolExecutor(
+ threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue,
+ new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()
+ );
nettyServerBuilder = NettyServerBuilder.forAddress(address);
nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
.maxInboundMessageSize(maxMessageSize)
.executor(executor);
- LOGGER.info("Server started, host {} listening on {}", host, port);
+ log.info("Server started, host {} listening on {}", host, port);
}
static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
- LOGGER.warn("Grpc server thread pool is full, rejecting the task");
+ log.warn("Grpc server thread pool is full, rejecting the task");
}
}
@@ -124,8 +126,9 @@ public class GRPCServer implements Server {
public void start() throws ServerException {
try {
if (sslContextBuilder != null) {
- nettyServerBuilder = nettyServerBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL)
- .build());
+ nettyServerBuilder = nettyServerBuilder.sslContext(
+ GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL)
+ .build());
}
server = nettyServerBuilder.build();
server.start();
@@ -135,15 +138,20 @@ public class GRPCServer implements Server {
}
public void addHandler(BindableService handler) {
- LOGGER.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
+ log.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(handler);
}
public void addHandler(ServerServiceDefinition definition) {
- LOGGER.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
+ log.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
nettyServerBuilder.addService(definition);
}
+ public void addHandler(ServerInterceptor serverInterceptor) {
+ log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port);
+ nettyServerBuilder.intercept(serverInterceptor);
+ }
+
@Override
public boolean isSSLOpen() {
return sslContextBuilder == null;
@@ -156,7 +164,8 @@ public class GRPCServer implements Server {
if (target == null || getClass() != target.getClass())
return false;
GRPCServer that = (GRPCServer) target;
- return port == that.port && Objects.equals(host, that.host) && Objects.equals(certChainFile, that.certChainFile) && Objects
+ return port == that.port && Objects.equals(host, that.host) && Objects.equals(
+ certChainFile, that.certChainFile) && Objects
.equals(privateKeyFile, that.privateKeyFile);
}
}
diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
index 5091b7b..7c3fabf 100644
--- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
@@ -45,6 +45,7 @@ public class SharingServerModuleProvider extends ModuleProvider {
private JettyServer jettyServer;
private ReceiverGRPCHandlerRegister receiverGRPCHandlerRegister;
private ReceiverJettyHandlerRegister receiverJettyHandlerRegister;
+ private AuthenticationInterceptor authenticationInterceptor;
public SharingServerModuleProvider() {
super();
@@ -92,6 +93,10 @@ public class SharingServerModuleProvider extends ModuleProvider {
this.registerServiceImplementation(JettyHandlerRegister.class, receiverJettyHandlerRegister);
}
+ if (StringUtil.isNotEmpty(config.getAuthentication())) {
+ authenticationInterceptor = new AuthenticationInterceptor(config.getAuthentication());
+ }
+
if (config.getGRPCPort() != 0) {
if (config.isGRPCSslEnabled()) {
grpcServer = new GRPCServer(
@@ -120,11 +125,15 @@ public class SharingServerModuleProvider extends ModuleProvider {
}
grpcServer.initialize();
- this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
+ GRPCHandlerRegisterImpl grpcHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
+ if (Objects.nonNull(authenticationInterceptor)) {
+ grpcHandlerRegister.addFilter(authenticationInterceptor);
+ }
+ this.registerServiceImplementation(GRPCHandlerRegister.class, grpcHandlerRegister);
} else {
this.receiverGRPCHandlerRegister = new ReceiverGRPCHandlerRegister();
- if (StringUtil.isNotEmpty(config.getAuthentication())) {
- receiverGRPCHandlerRegister.addFilter(new AuthenticationInterceptor(config.getAuthentication()));
+ if (Objects.nonNull(authenticationInterceptor)) {
+ receiverGRPCHandlerRegister.addFilter(authenticationInterceptor);
}
this.registerServiceImplementation(GRPCHandlerRegister.class, receiverGRPCHandlerRegister);
}