You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by ta...@apache.org on 2020/08/29 13:37:55 UTC

[skywalking] 01/01: fix auth in sharing server.

This is an automated email from the ASF dual-hosted git repository.

tanjian pushed a commit to branch auth_fix
in repository https://gitbox.apache.org/repos/asf/skywalking.git

commit ca2adf3e12abc6e5b2cb657e61e9219f0d7306fb
Author: JaredTan95 <ji...@daocloud.io>
AuthorDate: Sat Aug 29 21:35:09 2020 +0800

    fix auth in sharing server.
---
 .../core/server/GRPCHandlerRegisterImpl.java       |  2 +-
 .../oap/server/library/server/grpc/GRPCServer.java | 33 ++++++++++++++--------
 .../server/SharingServerModuleProvider.java        | 15 ++++++++--
 3 files changed, 34 insertions(+), 16 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
index 2080701..f75f355 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/server/GRPCHandlerRegisterImpl.java
@@ -43,6 +43,6 @@ public class GRPCHandlerRegisterImpl implements GRPCHandlerRegister {
 
     @Override
     public void addFilter(ServerInterceptor interceptor) {
-
+        server.addHandler(interceptor);
     }
 }
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
index d2eb9ac..2c0dd6c 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
@@ -19,6 +19,7 @@
 package org.apache.skywalking.oap.server.library.server.grpc;
 
 import io.grpc.BindableService;
+import io.grpc.ServerInterceptor;
 import io.grpc.ServerServiceDefinition;
 import io.grpc.netty.GrpcSslContexts;
 import io.grpc.netty.NettyServerBuilder;
@@ -33,15 +34,13 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.skywalking.oap.server.library.server.Server;
 import org.apache.skywalking.oap.server.library.server.ServerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
+@Slf4j
 public class GRPCServer implements Server {
 
-    private static final Logger LOGGER = LoggerFactory.getLogger(GRPCServer.class);
-
     private final String host;
     private final int port;
     private int maxConcurrentCallsPerConnection;
@@ -104,19 +103,22 @@ public class GRPCServer implements Server {
     public void initialize() {
         InetSocketAddress address = new InetSocketAddress(host, port);
         ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(threadPoolQueueSize);
-        ExecutorService executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue, new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler());
+        ExecutorService executor = new ThreadPoolExecutor(
+            threadPoolSize, threadPoolSize, 60, TimeUnit.SECONDS, blockingQueue,
+            new CustomThreadFactory("grpcServerPool"), new CustomRejectedExecutionHandler()
+        );
         nettyServerBuilder = NettyServerBuilder.forAddress(address);
         nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection)
                                                .maxInboundMessageSize(maxMessageSize)
                                                .executor(executor);
-        LOGGER.info("Server started, host {} listening on {}", host, port);
+        log.info("Server started, host {} listening on {}", host, port);
     }
 
     static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
 
         @Override
         public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
-            LOGGER.warn("Grpc server thread pool is full, rejecting the task");
+            log.warn("Grpc server thread pool is full, rejecting the task");
         }
     }
 
@@ -124,8 +126,9 @@ public class GRPCServer implements Server {
     public void start() throws ServerException {
         try {
             if (sslContextBuilder != null) {
-                nettyServerBuilder = nettyServerBuilder.sslContext(GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL)
-                                                                                  .build());
+                nettyServerBuilder = nettyServerBuilder.sslContext(
+                    GrpcSslContexts.configure(sslContextBuilder, SslProvider.OPENSSL)
+                                   .build());
             }
             server = nettyServerBuilder.build();
             server.start();
@@ -135,15 +138,20 @@ public class GRPCServer implements Server {
     }
 
     public void addHandler(BindableService handler) {
-        LOGGER.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
+        log.info("Bind handler {} into gRPC server {}:{}", handler.getClass().getSimpleName(), host, port);
         nettyServerBuilder.addService(handler);
     }
 
     public void addHandler(ServerServiceDefinition definition) {
-        LOGGER.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
+        log.info("Bind handler {} into gRPC server {}:{}", definition.getClass().getSimpleName(), host, port);
         nettyServerBuilder.addService(definition);
     }
 
+    public void addHandler(ServerInterceptor serverInterceptor) {
+        log.info("Bind interceptor {} into gRPC server {}:{}", serverInterceptor.getClass().getSimpleName(), host, port);
+        nettyServerBuilder.intercept(serverInterceptor);
+    }
+
     @Override
     public boolean isSSLOpen() {
         return sslContextBuilder == null;
@@ -156,7 +164,8 @@ public class GRPCServer implements Server {
         if (target == null || getClass() != target.getClass())
             return false;
         GRPCServer that = (GRPCServer) target;
-        return port == that.port && Objects.equals(host, that.host) && Objects.equals(certChainFile, that.certChainFile) && Objects
+        return port == that.port && Objects.equals(host, that.host) && Objects.equals(
+            certChainFile, that.certChainFile) && Objects
             .equals(privateKeyFile, that.privateKeyFile);
     }
 }
diff --git a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
index 5091b7b..7c3fabf 100644
--- a/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
+++ b/oap-server/server-receiver-plugin/skywalking-sharing-server-plugin/src/main/java/org/apache/skywalking/oap/server/receiver/sharing/server/SharingServerModuleProvider.java
@@ -45,6 +45,7 @@ public class SharingServerModuleProvider extends ModuleProvider {
     private JettyServer jettyServer;
     private ReceiverGRPCHandlerRegister receiverGRPCHandlerRegister;
     private ReceiverJettyHandlerRegister receiverJettyHandlerRegister;
+    private AuthenticationInterceptor authenticationInterceptor;
 
     public SharingServerModuleProvider() {
         super();
@@ -92,6 +93,10 @@ public class SharingServerModuleProvider extends ModuleProvider {
             this.registerServiceImplementation(JettyHandlerRegister.class, receiverJettyHandlerRegister);
         }
 
+        if (StringUtil.isNotEmpty(config.getAuthentication())) {
+            authenticationInterceptor = new AuthenticationInterceptor(config.getAuthentication());
+        }
+
         if (config.getGRPCPort() != 0) {
             if (config.isGRPCSslEnabled()) {
                 grpcServer = new GRPCServer(
@@ -120,11 +125,15 @@ public class SharingServerModuleProvider extends ModuleProvider {
             }
             grpcServer.initialize();
 
-            this.registerServiceImplementation(GRPCHandlerRegister.class, new GRPCHandlerRegisterImpl(grpcServer));
+            GRPCHandlerRegisterImpl grpcHandlerRegister = new GRPCHandlerRegisterImpl(grpcServer);
+            if (Objects.nonNull(authenticationInterceptor)) {
+                grpcHandlerRegister.addFilter(authenticationInterceptor);
+            }
+            this.registerServiceImplementation(GRPCHandlerRegister.class, grpcHandlerRegister);
         } else {
             this.receiverGRPCHandlerRegister = new ReceiverGRPCHandlerRegister();
-            if (StringUtil.isNotEmpty(config.getAuthentication())) {
-                receiverGRPCHandlerRegister.addFilter(new AuthenticationInterceptor(config.getAuthentication()));
+            if (Objects.nonNull(authenticationInterceptor)) {
+                receiverGRPCHandlerRegister.addFilter(authenticationInterceptor);
             }
             this.registerServiceImplementation(GRPCHandlerRegister.class, receiverGRPCHandlerRegister);
         }