You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@skywalking.apache.org by wu...@apache.org on 2018/10/29 06:52:29 UTC

[incubator-skywalking] 01/01: Support maxConcurrentCallsPerConnection and maxMessageSize in grpcServer

This is an automated email from the ASF dual-hosted git repository.

wusheng pushed a commit to branch optimization
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git

commit 67e816ab49d5ba40bd0ad7fbd18e71c900dcd539
Author: Wu Sheng <wu...@foxmail.com>
AuthorDate: Mon Oct 29 14:52:20 2018 +0800

    Support maxConcurrentCallsPerConnection and maxMessageSize in grpcServer
---
 .../oap/server/core/CoreModuleConfig.java          |  2 +
 .../oap/server/core/CoreModuleProvider.java        | 60 ++++++++++++++++++----
 .../oap/server/library/server/grpc/GRPCServer.java | 13 +++++
 3 files changed, 64 insertions(+), 11 deletions(-)

diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
index 62a329e..109bb73 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleConfig.java
@@ -32,6 +32,8 @@ public class CoreModuleConfig extends ModuleConfig {
     @Setter private String restContextPath;
     @Setter private String gRPCHost;
     @Setter private int gRPCPort;
+    @Setter private int maxConcurrentCallsPerConnection;
+    @Setter private int maxMessageSize;
     private final List<String> downsampling;
     @Setter private int recordDataTTL;
     @Setter private int minuteMetricsDataTTL;
diff --git a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
index 698ec73..b116fec 100644
--- a/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
+++ b/oap-server/server-core/src/main/java/org/apache/skywalking/oap/server/core/CoreModuleProvider.java
@@ -22,26 +22,58 @@ import java.io.IOException;
 import org.apache.skywalking.oap.server.core.analysis.indicator.annotation.IndicatorTypeListener;
 import org.apache.skywalking.oap.server.core.analysis.record.annotation.RecordTypeListener;
 import org.apache.skywalking.oap.server.core.annotation.AnnotationScan;
-import org.apache.skywalking.oap.server.core.cache.*;
-import org.apache.skywalking.oap.server.core.cluster.*;
-import org.apache.skywalking.oap.server.core.config.*;
-import org.apache.skywalking.oap.server.core.query.*;
+import org.apache.skywalking.oap.server.core.cache.CacheUpdateTimer;
+import org.apache.skywalking.oap.server.core.cache.EndpointInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.NetworkAddressInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInstanceInventoryCache;
+import org.apache.skywalking.oap.server.core.cache.ServiceInventoryCache;
+import org.apache.skywalking.oap.server.core.cluster.ClusterModule;
+import org.apache.skywalking.oap.server.core.cluster.ClusterRegister;
+import org.apache.skywalking.oap.server.core.cluster.RemoteInstance;
+import org.apache.skywalking.oap.server.core.config.ComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.config.DownsamplingConfigService;
+import org.apache.skywalking.oap.server.core.config.IComponentLibraryCatalogService;
+import org.apache.skywalking.oap.server.core.query.AggregationQueryService;
+import org.apache.skywalking.oap.server.core.query.AlarmQueryService;
+import org.apache.skywalking.oap.server.core.query.MetadataQueryService;
+import org.apache.skywalking.oap.server.core.query.MetricQueryService;
+import org.apache.skywalking.oap.server.core.query.TopologyQueryService;
+import org.apache.skywalking.oap.server.core.query.TraceQueryService;
 import org.apache.skywalking.oap.server.core.register.annotation.InventoryTypeListener;
-import org.apache.skywalking.oap.server.core.register.service.*;
-import org.apache.skywalking.oap.server.core.remote.*;
-import org.apache.skywalking.oap.server.core.remote.annotation.*;
+import org.apache.skywalking.oap.server.core.register.service.EndpointInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IEndpointInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.INetworkAddressInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.IServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.NetworkAddressInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.ServiceInstanceInventoryRegister;
+import org.apache.skywalking.oap.server.core.register.service.ServiceInventoryRegister;
+import org.apache.skywalking.oap.server.core.remote.RemoteSenderService;
+import org.apache.skywalking.oap.server.core.remote.RemoteServiceHandler;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamAnnotationListener;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataAnnotationContainer;
+import org.apache.skywalking.oap.server.core.remote.annotation.StreamDataClassGetter;
 import org.apache.skywalking.oap.server.core.remote.client.RemoteClientManager;
-import org.apache.skywalking.oap.server.core.server.*;
-import org.apache.skywalking.oap.server.core.source.*;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.GRPCHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegister;
+import org.apache.skywalking.oap.server.core.server.JettyHandlerRegisterImpl;
+import org.apache.skywalking.oap.server.core.source.SourceReceiver;
+import org.apache.skywalking.oap.server.core.source.SourceReceiverImpl;
 import org.apache.skywalking.oap.server.core.storage.PersistenceTimer;
 import org.apache.skywalking.oap.server.core.storage.annotation.StorageAnnotationListener;
 import org.apache.skywalking.oap.server.core.storage.model.IModelGetter;
 import org.apache.skywalking.oap.server.core.storage.ttl.DataTTLKeeperTimer;
-import org.apache.skywalking.oap.server.library.module.*;
+import org.apache.skywalking.oap.server.library.module.ModuleConfig;
+import org.apache.skywalking.oap.server.library.module.ModuleDefine;
+import org.apache.skywalking.oap.server.library.module.ModuleProvider;
+import org.apache.skywalking.oap.server.library.module.ModuleStartException;
+import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
 import org.apache.skywalking.oap.server.library.server.ServerException;
 import org.apache.skywalking.oap.server.library.server.grpc.GRPCServer;
 import org.apache.skywalking.oap.server.library.server.jetty.JettyServer;
-import org.slf4j.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * @author peng-yongsheng
@@ -82,6 +114,12 @@ public class CoreModuleProvider extends ModuleProvider {
 
     @Override public void prepare() throws ServiceNotProvidedException {
         grpcServer = new GRPCServer(moduleConfig.getGRPCHost(), moduleConfig.getGRPCPort());
+        if (moduleConfig.getMaxConcurrentCallsPerConnection() > 0) {
+            grpcServer.setMaxConcurrentCallsPerConnection(moduleConfig.getMaxConcurrentCallsPerConnection());
+        }
+        if (moduleConfig.getMaxMessageSize() > 0) {
+            grpcServer.setMaxMessageSize(moduleConfig.getMaxMessageSize());
+        }
         grpcServer.initialize();
 
         jettyServer = new JettyServer(moduleConfig.getRestHost(), moduleConfig.getRestPort(), moduleConfig.getRestContextPath());
diff --git a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
index 0bd15e6..8314829 100644
--- a/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
+++ b/oap-server/server-library/library-server/src/main/java/org/apache/skywalking/oap/server/library/server/grpc/GRPCServer.java
@@ -37,6 +37,8 @@ public class GRPCServer implements Server {
 
     private final String host;
     private final int port;
+    private int maxConcurrentCallsPerConnection;
+    private int maxMessageSize;
     private io.grpc.Server server;
     private NettyServerBuilder nettyServerBuilder;
     private SslContextBuilder sslContextBuilder;
@@ -46,6 +48,16 @@ public class GRPCServer implements Server {
     public GRPCServer(String host, int port) {
         this.host = host;
         this.port = port;
+        this.maxConcurrentCallsPerConnection = 4;
+        this.maxMessageSize = Integer.MAX_VALUE;
+    }
+
+    public void setMaxConcurrentCallsPerConnection(int maxConcurrentCallsPerConnection) {
+        this.maxConcurrentCallsPerConnection = maxConcurrentCallsPerConnection;
+    }
+
+    public void setMaxMessageSize(int maxMessageSize) {
+        this.maxMessageSize = maxMessageSize;
     }
 
     /**
@@ -79,6 +91,7 @@ public class GRPCServer implements Server {
     public void initialize() {
         InetSocketAddress address = new InetSocketAddress(host, port);
         nettyServerBuilder = NettyServerBuilder.forAddress(address);
+        nettyServerBuilder = nettyServerBuilder.maxConcurrentCallsPerConnection(maxConcurrentCallsPerConnection).maxMessageSize(maxMessageSize);
         logger.info("Server started, host {} listening on {}", host, port);
     }