You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@skywalking.apache.org by zh...@apache.org on 2018/03/31 12:06:50 UTC
[incubator-skywalking] branch master updated: Support token auth in
gRPC upstream (#1000)
This is an automated email from the ASF dual-hosted git repository.
zhangxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-skywalking.git
The following commit(s) were added to refs/heads/master by this push:
new d322f52 Support token auth in gRPC upstream (#1000)
d322f52 is described below
commit d322f52843c3dcbade6174bd1818eb345189dfe6
Author: 吴晟 Wu Sheng <wu...@foxmail.com>
AuthorDate: Sat Mar 31 20:06:48 2018 +0800
Support token auth in gRPC upstream (#1000)
* Add client side token.
* Fix ci.
* Fix most codes of token auth at both sides.
* Add settings in config files.
* Make auth interceptor doesn't throw expcetion.
* Update AuthenticationSimpleChecker.java
* Update AuthenticationActivator.java
* Revert "Update AuthenticationActivator.java"
This reverts commit 0935f18fd6802ab84c12e0609c21ef753ff51be3.
* Merge branch 'feature/token-auth' of https://github.com/apache/incubator-skywalking into feature/token-auth
# Conflicts:
# apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AuthenticationSimpleChecker.java
* Fix Auhentication token works incorrect (#1001)
* Fix Auhentication token works incorrect
* Change way to build GRPC Channel
* Response an empty header when auth fails.
---
.../grpc/provider/AgentModuleGRPCProvider.java | 34 ++++---
.../grpc/provider/AuthenticationSimpleChecker.java | 70 ++++++++++++++
.../jetty/provider/AgentModuleJettyProvider.java | 15 ++-
.../src/main/resources/application.yml | 3 +
.../skywalking/apm/collector/server/Server.java | 2 -
.../apm/collector/server/grpc/GRPCServer.java | 13 ++-
.../apm/collector/server/jetty/JettyServer.java | 29 +++---
.../grpc/manager/service/GRPCManagerService.java | 6 +-
.../manager/service/GRPCManagerServiceImpl.java | 7 +-
.../jetty/manager/service/JettyManagerService.java | 8 +-
.../manager/service/JettyManagerServiceImpl.java | 12 +--
.../service/NamingJettyHandlerRegisterService.java | 11 ++-
.../remote/grpc/RemoteModuleGRPCProvider.java | 13 +--
.../collector/ui/jetty/UIModuleJettyProvider.java | 9 +-
.../skywalking/apm/agent/core/conf/Config.java | 6 ++
.../skywalking/apm/agent/core/jvm/JVMService.java | 15 +--
.../core/remote/AppAndServiceRegisterClient.java | 23 ++---
.../agent/core/remote/AuthenticationDecorator.java | 62 +++++++++++++
.../apm/agent/core/remote/ChannelBuilder.java | 22 +----
.../apm/agent/core/remote/ChannelDecorator.java | 22 +----
.../apm/agent/core/remote/GRPCChannel.java | 103 +++++++++++++++++++++
.../apm/agent/core/remote/GRPCChannelManager.java | 58 ++++++------
.../agent/core/remote/StandardChannelBuilder.java | 22 +++--
.../apm/agent/core/remote/TLSChannelBuilder.java | 29 ++----
.../core/remote/TraceSegmentServiceClient.java | 15 +--
apm-sniffer/config/agent.config | 3 +
26 files changed, 410 insertions(+), 202 deletions(-)
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AgentModuleGRPCProvider.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AgentModuleGRPCProvider.java
index 2444182..c0bc726 100644
--- a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AgentModuleGRPCProvider.java
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AgentModuleGRPCProvider.java
@@ -18,16 +18,8 @@
package org.apache.skywalking.apm.collector.agent.grpc.provider;
-import java.io.File;
-import java.util.Properties;
-
import org.apache.skywalking.apm.collector.agent.grpc.define.AgentGRPCModule;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ApplicationRegisterServiceHandler;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.InstanceDiscoveryServiceHandler;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.JVMMetricsServiceHandler;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.NetworkAddressRegisterServiceHandler;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.ServiceNameDiscoveryServiceHandler;
-import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.TraceSegmentServiceHandler;
+import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingHandler;
import org.apache.skywalking.apm.collector.agent.grpc.provider.handler.naming.AgentGRPCNamingListener;
import org.apache.skywalking.apm.collector.analysis.metric.define.AnalysisMetricModule;
@@ -42,9 +34,12 @@ import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
-import org.apache.skywalking.apm.collector.server.Server;
+import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import org.eclipse.jetty.util.StringUtil;
+import java.io.File;
+import java.util.Properties;
+
/**
* @author peng-yongsheng
*/
@@ -55,6 +50,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
private static final String PORT = "port";
private static final String SSL_CERT_CHAIN_FILEPATH = "ssl_cert_chain_file";
private static final String SSL_PRIVATE_KEY_FILE = "ssl_private_key_file";
+ private static final String AUTHENTICATION = "authentication";
@Override
public String name() {
@@ -77,6 +73,8 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer) config.get(PORT);
String sslCertChainFilePath = config.getProperty(SSL_CERT_CHAIN_FILEPATH);
String sslPrivateKeyFilePath = config.getProperty(SSL_PRIVATE_KEY_FILE);
+
+ AuthenticationSimpleChecker.INSTANCE.setExpectedToken(config.getProperty(AUTHENTICATION, ""));
File sslCertChainFile = null;
File sslPrivateKeyFile = null;
if (StringUtil.isNotBlank(sslCertChainFilePath)) {
@@ -103,7 +101,7 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
namingHandlerRegisterService.register(new AgentGRPCNamingHandler(namingListener));
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
- Server gRPCServer;
+ GRPCServer gRPCServer;
if (sslCertChainFile != null && sslPrivateKeyFile != null) {
gRPCServer = managerService.createIfAbsent(host, port, sslCertChainFile, sslPrivateKeyFile);
} else {
@@ -123,12 +121,12 @@ public class AgentModuleGRPCProvider extends ModuleProvider {
return new String[]{ClusterModule.NAME, NamingModule.NAME, GRPCManagerModule.NAME, AnalysisSegmentParserModule.NAME, AnalysisMetricModule.NAME};
}
- private void addHandlers(Server gRPCServer) {
- gRPCServer.addHandler(new ApplicationRegisterServiceHandler(getManager()));
- gRPCServer.addHandler(new InstanceDiscoveryServiceHandler(getManager()));
- gRPCServer.addHandler(new ServiceNameDiscoveryServiceHandler(getManager()));
- gRPCServer.addHandler(new JVMMetricsServiceHandler(getManager()));
- gRPCServer.addHandler(new TraceSegmentServiceHandler(getManager()));
- gRPCServer.addHandler(new NetworkAddressRegisterServiceHandler(getManager()));
+ private void addHandlers(GRPCServer gRPCServer) {
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new ApplicationRegisterServiceHandler(getManager()));
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new InstanceDiscoveryServiceHandler(getManager()));
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new ServiceNameDiscoveryServiceHandler(getManager()));
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new JVMMetricsServiceHandler(getManager()));
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new TraceSegmentServiceHandler(getManager()));
+ AuthenticationSimpleChecker.INSTANCE.build(gRPCServer, new NetworkAddressRegisterServiceHandler(getManager()));
}
}
diff --git a/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AuthenticationSimpleChecker.java b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AuthenticationSimpleChecker.java
new file mode 100644
index 0000000..10e69da
--- /dev/null
+++ b/apm-collector/apm-collector-agent/agent-grpc/agent-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/agent/grpc/provider/AuthenticationSimpleChecker.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.collector.agent.grpc.provider;
+
+import io.grpc.BindableService;
+import io.grpc.Metadata;
+import io.grpc.ServerCall;
+import io.grpc.ServerCallHandler;
+import io.grpc.ServerInterceptor;
+import io.grpc.ServerInterceptors;
+import io.grpc.Status;
+import org.apache.skywalking.apm.collector.core.util.StringUtils;
+import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
+
+/**
+ * Active the authentication token checker if expected token exists in application.yml
+ *
+ * @author wusheng
+ */
+public enum AuthenticationSimpleChecker {
+ INSTANCE;
+
+ private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME =
+ Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER);
+
+ private String expectedToken = "";
+
+ public void build(GRPCServer gRPCServer, BindableService targetService) {
+ if (StringUtils.isNotEmpty(expectedToken)) {
+ gRPCServer.addHandler(ServerInterceptors.intercept(targetService, new ServerInterceptor() {
+ @Override
+ public <REQ, RESP> ServerCall.Listener<REQ> interceptCall(ServerCall<REQ, RESP> serverCall,
+ Metadata metadata,
+ ServerCallHandler<REQ, RESP> next) {
+ String token = metadata.get(AUTH_HEAD_HEADER_NAME);
+ if (expectedToken.equals(token)) {
+ return next.startCall(serverCall, metadata);
+ } else {
+ serverCall.close(Status.PERMISSION_DENIED, new Metadata());
+ return new ServerCall.Listener() {
+ };
+ }
+
+ }
+ }));
+ } else {
+ gRPCServer.addHandler(targetService);
+ }
+ }
+
+ public void setExpectedToken(String expectedToken) {
+ this.expectedToken = expectedToken;
+ }
+}
diff --git a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/AgentModuleJettyProvider.java b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/AgentModuleJettyProvider.java
index 0deed78..cf87b91 100644
--- a/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/AgentModuleJettyProvider.java
+++ b/apm-collector/apm-collector-agent/agent-jetty/agent-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/agent/jetty/provider/AgentModuleJettyProvider.java
@@ -18,13 +18,8 @@
package org.apache.skywalking.apm.collector.agent.jetty.provider;
-import java.util.Properties;
import org.apache.skywalking.apm.collector.agent.jetty.define.AgentJettyModule;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ApplicationRegisterServletHandler;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.InstanceDiscoveryServletHandler;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.NetworkAddressRegisterServletHandler;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.ServiceNameDiscoveryServiceHandler;
-import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.TraceSegmentServletHandler;
+import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.*;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingHandler;
import org.apache.skywalking.apm.collector.agent.jetty.provider.handler.naming.AgentJettyNamingListener;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
@@ -37,7 +32,9 @@ import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
-import org.apache.skywalking.apm.collector.server.Server;
+import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
+
+import java.util.Properties;
/**
* @author peng-yongsheng
@@ -77,7 +74,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new AgentJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
- Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
+ JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(jettyServer);
}
@@ -89,7 +86,7 @@ public class AgentModuleJettyProvider extends ModuleProvider {
return new String[] {ClusterModule.NAME, NamingModule.NAME, JettyManagerModule.NAME};
}
- private void addHandlers(Server jettyServer) {
+ private void addHandlers(JettyServer jettyServer) {
jettyServer.addHandler(new TraceSegmentServletHandler(getManager()));
jettyServer.addHandler(new ApplicationRegisterServletHandler(getManager()));
jettyServer.addHandler(new InstanceDiscoveryServletHandler(getManager()));
diff --git a/apm-collector/apm-collector-boot/src/main/resources/application.yml b/apm-collector/apm-collector-boot/src/main/resources/application.yml
index 18aa44f..bcaa653 100644
--- a/apm-collector/apm-collector-boot/src/main/resources/application.yml
+++ b/apm-collector/apm-collector-boot/src/main/resources/application.yml
@@ -34,6 +34,9 @@ agent_gRPC:
#Set these two setting to open ssl
#ssl_cert_chain_file: $path
#ssl_private_key_file: $path
+
+ #Set your own token to active auth
+ #authentication: xxxxxx
agent_jetty:
jetty:
host: localhost
diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
index efd1537..8a8e32c 100644
--- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
+++ b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
@@ -32,8 +32,6 @@ public interface Server {
void start() throws ServerException;
- void addHandler(ServerHandler handler);
-
boolean isSSLOpen();
boolean isStatusEqual(Server target);
diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/grpc/GRPCServer.java b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/grpc/GRPCServer.java
index 876c7d3..bc1958a 100644
--- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/grpc/GRPCServer.java
+++ b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/grpc/GRPCServer.java
@@ -19,13 +19,14 @@
package org.apache.skywalking.apm.collector.server.grpc;
+import io.grpc.BindableService;
+import io.grpc.ServerServiceDefinition;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NettyServerBuilder;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslProvider;
import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
-import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -103,11 +104,15 @@ public class GRPCServer implements Server {
}
}
- @Override
- public void addHandler(ServerHandler handler) {
- nettyServerBuilder.addService((io.grpc.BindableService) handler);
+ public void addHandler(BindableService handler) {
+ nettyServerBuilder.addService(handler);
+ }
+
+ public void addHandler(ServerServiceDefinition definition) {
+ nettyServerBuilder.addService(definition);
}
+
@Override
public boolean isSSLOpen() {
return sslContextBuilder == null;
diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyServer.java b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyServer.java
index 5f267c6..5925264 100644
--- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyServer.java
+++ b/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/jetty/JettyServer.java
@@ -19,18 +19,17 @@
package org.apache.skywalking.apm.collector.server.jetty;
-import java.net.InetSocketAddress;
-import java.util.Objects;
-import javax.servlet.http.HttpServlet;
+import org.apache.skywalking.apm.collector.server.Server;
+import org.apache.skywalking.apm.collector.server.ServerException;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.servlet.ServletMapping;
-import org.apache.skywalking.apm.collector.server.Server;
-import org.apache.skywalking.apm.collector.server.ServerException;
-import org.apache.skywalking.apm.collector.server.ServerHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.InetSocketAddress;
+import java.util.Objects;
+
/**
* @author peng-yongsheng, wusheng
*/
@@ -50,15 +49,18 @@ public class JettyServer implements Server {
this.contextPath = contextPath;
}
- @Override public String hostPort() {
+ @Override
+ public String hostPort() {
return host + ":" + port;
}
- @Override public String serverClassify() {
+ @Override
+ public String serverClassify() {
return "Jetty";
}
- @Override public void initialize() throws ServerException {
+ @Override
+ public void initialize() throws ServerException {
server = new org.eclipse.jetty.server.Server(new InetSocketAddress(host, port));
servletContextHandler = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
@@ -68,10 +70,10 @@ public class JettyServer implements Server {
server.setHandler(servletContextHandler);
}
- @Override public void addHandler(ServerHandler handler) {
+ public void addHandler(JettyHandler handler) {
ServletHolder servletHolder = new ServletHolder();
- servletHolder.setServlet((HttpServlet)handler);
- servletContextHandler.addServlet(servletHolder, ((JettyHandler)handler).pathSpec());
+ servletHolder.setServlet(handler);
+ servletContextHandler.addServlet(servletHolder, handler.pathSpec());
}
@Override
@@ -84,7 +86,8 @@ public class JettyServer implements Server {
return equals(target);
}
- @Override public void start() throws ServerException {
+ @Override
+ public void start() throws ServerException {
logger.info("start server, host: {}, port: {}", host, port);
try {
for (ServletMapping servletMapping : servletContextHandler.getServletHandler().getServletMappings()) {
diff --git a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java b/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java
index d633866..0c1b161 100644
--- a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java
+++ b/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java
@@ -20,7 +20,7 @@
package org.apache.skywalking.apm.collector.grpc.manager.service;
import org.apache.skywalking.apm.collector.core.module.Service;
-import org.apache.skywalking.apm.collector.server.Server;
+import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import java.io.File;
@@ -28,7 +28,7 @@ import java.io.File;
* @author peng-yongsheng, wusheng
*/
public interface GRPCManagerService extends Service {
- Server createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException;
+ GRPCServer createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException;
- Server createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException;
+ GRPCServer createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException;
}
diff --git a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-provider/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerServiceImpl.java b/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-provider/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerServiceImpl.java
index ea58acb..e65f4a1 100644
--- a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-provider/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerServiceImpl.java
+++ b/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-provider/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerServiceImpl.java
@@ -19,7 +19,6 @@
package org.apache.skywalking.apm.collector.grpc.manager.service;
-import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
import org.slf4j.Logger;
@@ -42,16 +41,16 @@ public class GRPCManagerServiceImpl implements GRPCManagerService {
}
@Override
- public Server createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException {
+ public GRPCServer createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException {
return createOrChooseServer(host, port, new GRPCServer(host, port));
}
@Override
- public Server createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException {
+ public GRPCServer createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException {
return createOrChooseServer(host, port, new GRPCServer(host, port, certChainFile, privateKeyFile));
}
- private Server createOrChooseServer(String host, int port, GRPCServer newServer) throws ServerCanNotBeCreatedException {
+ private GRPCServer createOrChooseServer(String host, int port, GRPCServer newServer) throws ServerCanNotBeCreatedException {
String id = host + String.valueOf(port);
GRPCServer existServer = servers.get(id);
if (existServer != null) {
diff --git a/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-define/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerService.java b/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-define/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerService.java
index 2747ae6..3de2daa 100644
--- a/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-define/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerService.java
+++ b/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-define/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerService.java
@@ -20,14 +20,14 @@
package org.apache.skywalking.apm.collector.jetty.manager.service;
import org.apache.skywalking.apm.collector.core.module.Service;
-import org.apache.skywalking.apm.collector.server.Server;
-import org.apache.skywalking.apm.collector.server.ServerHandler;
+import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
+import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
/**
* @author peng-yongsheng
*/
public interface JettyManagerService extends Service {
- Server createIfAbsent(String host, int port, String contextPath);
+ JettyServer createIfAbsent(String host, int port, String contextPath);
- void addHandler(String host, int port, ServerHandler serverHandler);
+ void addHandler(String host, int port, JettyHandler serverHandler);
}
diff --git a/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-provider/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerServiceImpl.java b/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-provider/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerServiceImpl.java
index 7bcf25c..8a0de70 100644
--- a/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-provider/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerServiceImpl.java
+++ b/apm-collector/apm-collector-jetty-manager/collector-jetty-manager-provider/src/main/java/org/apache/skywalking/apm/collector/jetty/manager/service/JettyManagerServiceImpl.java
@@ -19,15 +19,15 @@
package org.apache.skywalking.apm.collector.jetty.manager.service;
-import java.util.Map;
-import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.apache.skywalking.apm.collector.core.UnexpectedException;
-import org.apache.skywalking.apm.collector.server.Server;
import org.apache.skywalking.apm.collector.server.ServerException;
-import org.apache.skywalking.apm.collector.server.ServerHandler;
+import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
+import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
+
/**
* @author peng-yongsheng
*/
@@ -41,7 +41,7 @@ public class JettyManagerServiceImpl implements JettyManagerService {
this.servers = servers;
}
- @Override public Server createIfAbsent(String host, int port, String contextPath) {
+ @Override public JettyServer createIfAbsent(String host, int port, String contextPath) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
return servers.get(id);
@@ -57,7 +57,7 @@ public class JettyManagerServiceImpl implements JettyManagerService {
}
}
- @Override public void addHandler(String host, int port, ServerHandler serverHandler) {
+ @Override public void addHandler(String host, int port, JettyHandler serverHandler) {
String id = host + String.valueOf(port);
if (servers.containsKey(id)) {
servers.get(id).addHandler(serverHandler);
diff --git a/apm-collector/apm-collector-naming/collector-naming-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/naming/jetty/service/NamingJettyHandlerRegisterService.java b/apm-collector/apm-collector-naming/collector-naming-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/naming/jetty/service/NamingJettyHandlerRegisterService.java
index d73ac83..0be9c60 100644
--- a/apm-collector/apm-collector-naming/collector-naming-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/naming/jetty/service/NamingJettyHandlerRegisterService.java
+++ b/apm-collector/apm-collector-naming/collector-naming-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/naming/jetty/service/NamingJettyHandlerRegisterService.java
@@ -20,10 +20,11 @@
package org.apache.skywalking.apm.collector.naming.jetty.service;
import org.apache.skywalking.apm.collector.core.module.ModuleManager;
+import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
-import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.server.ServerHandler;
+import org.apache.skywalking.apm.collector.server.jetty.JettyHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,8 +45,12 @@ public class NamingJettyHandlerRegisterService implements NamingHandlerRegisterS
this.port = port;
}
- @Override public void register(ServerHandler namingHandler) {
+ @Override
+ public void register(ServerHandler namingHandler) {
+ if (!(namingHandler instanceof JettyHandler)) {
+ throw new IllegalArgumentException("NamingJettyHandlerRegisterService support JettyHandler only.");
+ }
JettyManagerService managerService = moduleManager.find(JettyManagerModule.NAME).getService(JettyManagerService.class);
- managerService.addHandler(this.host, this.port, namingHandler);
+ managerService.addHandler(this.host, this.port, (JettyHandler)namingHandler);
}
}
diff --git a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java
index b098424..9f2f3ac 100644
--- a/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java
+++ b/apm-collector/apm-collector-remote/collector-remote-grpc-provider/src/main/java/org/apache/skywalking/apm/collector/remote/grpc/RemoteModuleGRPCProvider.java
@@ -19,22 +19,23 @@
package org.apache.skywalking.apm.collector.remote.grpc;
-import java.util.Properties;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
+import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
import org.apache.skywalking.apm.collector.cluster.service.ModuleRegisterService;
import org.apache.skywalking.apm.collector.core.module.Module;
import org.apache.skywalking.apm.collector.core.module.ModuleProvider;
import org.apache.skywalking.apm.collector.core.module.ServiceNotProvidedException;
import org.apache.skywalking.apm.collector.grpc.manager.GRPCManagerModule;
+import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
import org.apache.skywalking.apm.collector.remote.RemoteModule;
import org.apache.skywalking.apm.collector.remote.grpc.handler.RemoteCommonServiceHandler;
+import org.apache.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService;
import org.apache.skywalking.apm.collector.remote.service.CommonRemoteDataRegisterService;
import org.apache.skywalking.apm.collector.remote.service.RemoteDataRegisterService;
import org.apache.skywalking.apm.collector.remote.service.RemoteSenderService;
-import org.apache.skywalking.apm.collector.server.Server;
-import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
-import org.apache.skywalking.apm.collector.grpc.manager.service.GRPCManagerService;
-import org.apache.skywalking.apm.collector.remote.grpc.service.GRPCRemoteSenderService;
+import org.apache.skywalking.apm.collector.server.grpc.GRPCServer;
+
+import java.util.Properties;
/**
* @author peng-yongsheng
@@ -76,7 +77,7 @@ public class RemoteModuleGRPCProvider extends ModuleProvider {
Integer port = (Integer)config.get(PORT);
GRPCManagerService managerService = getManager().find(GRPCManagerModule.NAME).getService(GRPCManagerService.class);
- Server gRPCServer = managerService.createIfAbsent(host, port);
+ GRPCServer gRPCServer = managerService.createIfAbsent(host, port);
gRPCServer.addHandler(new RemoteCommonServiceHandler(remoteDataRegisterService));
ModuleRegisterService moduleRegisterService = getManager().find(ClusterModule.NAME).getService(ModuleRegisterService.class);
diff --git a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/UIModuleJettyProvider.java b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/UIModuleJettyProvider.java
index d01cda2..e8d7655 100644
--- a/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/UIModuleJettyProvider.java
+++ b/apm-collector/apm-collector-ui/collector-ui-jetty-provider/src/main/java/org/apache/skywalking/apm/collector/ui/jetty/UIModuleJettyProvider.java
@@ -18,7 +18,6 @@
package org.apache.skywalking.apm.collector.ui.jetty;
-import java.util.Properties;
import org.apache.skywalking.apm.collector.cache.CacheModule;
import org.apache.skywalking.apm.collector.cluster.ClusterModule;
import org.apache.skywalking.apm.collector.cluster.service.ModuleListenerService;
@@ -30,13 +29,15 @@ import org.apache.skywalking.apm.collector.jetty.manager.JettyManagerModule;
import org.apache.skywalking.apm.collector.jetty.manager.service.JettyManagerService;
import org.apache.skywalking.apm.collector.naming.NamingModule;
import org.apache.skywalking.apm.collector.naming.service.NamingHandlerRegisterService;
-import org.apache.skywalking.apm.collector.server.Server;
+import org.apache.skywalking.apm.collector.server.jetty.JettyServer;
import org.apache.skywalking.apm.collector.storage.StorageModule;
import org.apache.skywalking.apm.collector.ui.UIModule;
import org.apache.skywalking.apm.collector.ui.jetty.handler.GraphQLHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingHandler;
import org.apache.skywalking.apm.collector.ui.jetty.handler.naming.UIJettyNamingListener;
+import java.util.Properties;
+
/**
* @author peng-yongsheng
*/
@@ -74,7 +75,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
namingHandlerRegisterService.register(new UIJettyNamingHandler(namingListener));
JettyManagerService managerService = getManager().find(JettyManagerModule.NAME).getService(JettyManagerService.class);
- Server jettyServer = managerService.createIfAbsent(host, port, contextPath);
+ JettyServer jettyServer = managerService.createIfAbsent(host, port, contextPath);
addHandlers(jettyServer);
}
@@ -86,7 +87,7 @@ public class UIModuleJettyProvider extends ModuleProvider {
return new String[] {ClusterModule.NAME, JettyManagerModule.NAME, NamingModule.NAME, CacheModule.NAME, StorageModule.NAME};
}
- private void addHandlers(Server jettyServer) {
+ private void addHandlers(JettyServer jettyServer) {
jettyServer.addHandler(new GraphQLHandler(getManager()));
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
index 8dc54a0..893d1aa 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/conf/Config.java
@@ -43,6 +43,12 @@ public class Config {
public static String APPLICATION_CODE = "";
/**
+ * Authentication active is based on backend setting, see application.yml for more details.
+ * For most scenarios, this needs backend extensions, only basic match auth provided in default implementation.
+ */
+ public static String AUTHENTICATION = "";
+
+ /**
* Negative or zero means off, by default. {@link #SAMPLE_N_PER_3_SECS} means sampling N {@link TraceSegment} in
* 10 seconds tops.
*/
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
index 8d47590..f5ce5b4 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/jvm/JVMService.java
@@ -18,12 +18,7 @@
package org.apache.skywalking.apm.agent.core.jvm;
-import io.grpc.ManagedChannel;
-import java.util.LinkedList;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import io.grpc.Channel;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
@@ -44,6 +39,12 @@ import org.apache.skywalking.apm.network.proto.JVMMetrics;
import org.apache.skywalking.apm.network.proto.JVMMetricsServiceGrpc;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import java.util.LinkedList;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
/**
* The <code>JVMService</code> represents a timer,
* which collectors JVM cpu, memory, memorypool and gc info,
@@ -149,7 +150,7 @@ public class JVMService implements BootService, Runnable {
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
- ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
+ Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
stub = JVMMetricsServiceGrpc.newBlockingStub(channel);
}
this.status = status;
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
index 6a223d6..2cc1253 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AppAndServiceRegisterClient.java
@@ -18,11 +18,7 @@
package org.apache.skywalking.apm.agent.core.remote;
-import io.grpc.ManagedChannel;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import io.grpc.Channel;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
@@ -37,17 +33,14 @@ import org.apache.skywalking.apm.agent.core.dictionary.OperationNameDictionary;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
import org.apache.skywalking.apm.agent.core.os.OSUtil;
-import org.apache.skywalking.apm.network.proto.Application;
-import org.apache.skywalking.apm.network.proto.ApplicationInstance;
-import org.apache.skywalking.apm.network.proto.ApplicationInstanceHeartbeat;
-import org.apache.skywalking.apm.network.proto.ApplicationInstanceMapping;
-import org.apache.skywalking.apm.network.proto.ApplicationMapping;
-import org.apache.skywalking.apm.network.proto.ApplicationRegisterServiceGrpc;
-import org.apache.skywalking.apm.network.proto.InstanceDiscoveryServiceGrpc;
-import org.apache.skywalking.apm.network.proto.NetworkAddressRegisterServiceGrpc;
-import org.apache.skywalking.apm.network.proto.ServiceNameDiscoveryServiceGrpc;
+import org.apache.skywalking.apm.network.proto.*;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
+import java.util.UUID;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
/**
* @author wusheng
*/
@@ -66,7 +59,7 @@ public class AppAndServiceRegisterClient implements BootService, GRPCChannelList
@Override
public void statusChanged(GRPCChannelStatus status) {
if (GRPCChannelStatus.CONNECTED.equals(status)) {
- ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
+ Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
applicationRegisterServiceBlockingStub = ApplicationRegisterServiceGrpc.newBlockingStub(channel);
instanceDiscoveryServiceBlockingStub = InstanceDiscoveryServiceGrpc.newBlockingStub(channel);
serviceNameDiscoveryServiceBlockingStub = ServiceNameDiscoveryServiceGrpc.newBlockingStub(channel);
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AuthenticationDecorator.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AuthenticationDecorator.java
new file mode 100644
index 0000000..53c9ca8
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/AuthenticationDecorator.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.remote;
+
+import io.grpc.CallOptions;
+import io.grpc.Channel;
+import io.grpc.ClientCall;
+import io.grpc.ClientInterceptor;
+import io.grpc.ClientInterceptors;
+import io.grpc.ForwardingClientCall;
+import io.grpc.Metadata;
+import io.grpc.MethodDescriptor;
+import org.apache.skywalking.apm.agent.core.conf.Config;
+import org.apache.skywalking.apm.util.StringUtil;
+
+/**
+ * Active authentication header by Config.Agent.AUTHENTICATION
+ *
+ * @author wu-sheng, zhang xin
+ */
+public class AuthenticationDecorator implements ChannelDecorator {
+ private static final Metadata.Key<String> AUTH_HEAD_HEADER_NAME =
+ Metadata.Key.of("Authentication", Metadata.ASCII_STRING_MARSHALLER);
+
+ @Override
+ public Channel build(Channel channel) {
+ if (StringUtil.isEmpty(Config.Agent.AUTHENTICATION)) {
+ return channel;
+ }
+
+ return ClientInterceptors.intercept(channel, new ClientInterceptor() {
+ @Override
+ public <REQ, RESP> ClientCall<REQ, RESP> interceptCall(MethodDescriptor<REQ, RESP> method,
+ CallOptions options, Channel channel) {
+ return new ForwardingClientCall.SimpleForwardingClientCall<REQ, RESP>(channel.newCall(method, options)) {
+ @Override
+ public void start(Listener<RESP> responseListener, Metadata headers) {
+ headers.put(AUTH_HEAD_HEADER_NAME, Config.Agent.AUTHENTICATION);
+
+ super.start(responseListener, headers);
+ }
+ };
+ }
+ });
+ }
+}
diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelBuilder.java
similarity index 68%
copy from apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelBuilder.java
index efd1537..f6b51c5 100644
--- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelBuilder.java
@@ -16,25 +16,13 @@
*
*/
+package org.apache.skywalking.apm.agent.core.remote;
-package org.apache.skywalking.apm.collector.server;
+import io.grpc.ManagedChannelBuilder;
/**
- * @author peng-yongsheng, wusheng
+ * @author zhang xin
*/
-public interface Server {
-
- String hostPort();
-
- String serverClassify();
-
- void initialize() throws ServerException;
-
- void start() throws ServerException;
-
- void addHandler(ServerHandler handler);
-
- boolean isSSLOpen();
-
- boolean isStatusEqual(Server target);
+public interface ChannelBuilder<B extends ManagedChannelBuilder> {
+ B build(B managedChannelBuilder) throws Exception;
}
diff --git a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelDecorator.java
similarity index 68%
copy from apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelDecorator.java
index efd1537..146d04e 100644
--- a/apm-collector/apm-collector-component/server-component/src/main/java/org/apache/skywalking/apm/collector/server/Server.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/ChannelDecorator.java
@@ -16,25 +16,13 @@
*
*/
+package org.apache.skywalking.apm.agent.core.remote;
-package org.apache.skywalking.apm.collector.server;
+import io.grpc.Channel;
/**
- * @author peng-yongsheng, wusheng
+ * @author zhang xin
*/
-public interface Server {
-
- String hostPort();
-
- String serverClassify();
-
- void initialize() throws ServerException;
-
- void start() throws ServerException;
-
- void addHandler(ServerHandler handler);
-
- boolean isSSLOpen();
-
- boolean isStatusEqual(Server target);
+public interface ChannelDecorator {
+ Channel build(Channel channel);
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java
new file mode 100644
index 0000000..12756cc
--- /dev/null
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannel.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.remote;
+
+import io.grpc.Channel;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.netty.NettyChannelBuilder;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * @author zhangxin
+ */
+public class GRPCChannel {
+ /**
+ * origin channel
+ */
+ private final ManagedChannel originChannel;
+ private final Channel channelWithDecorators;
+
+ private GRPCChannel(String host, int port, List<ChannelBuilder> channelBuilders,
+ List<ChannelDecorator> decorators) throws Exception {
+ ManagedChannelBuilder channelBuilder = NettyChannelBuilder.forAddress(host, port);
+
+ for (ChannelBuilder builder : channelBuilders) {
+ channelBuilder = builder.build(channelBuilder);
+ }
+
+ this.originChannel = channelBuilder.build();
+
+ Channel channel = originChannel;
+ for (ChannelDecorator decorator : decorators) {
+ channel = decorator.build(channel);
+ }
+
+ channelWithDecorators = channel;
+ }
+
+ public static Builder newBuilder(String host, int port) {
+ return new Builder(host, port);
+ }
+
+ public Channel getChannel() {
+ return this.channelWithDecorators;
+ }
+
+ public boolean isTerminated() {
+ return originChannel.isTerminated();
+ }
+
+ public void shutdownNow() {
+ originChannel.shutdownNow();
+ }
+
+ public boolean isShutdown() {
+ return originChannel.isShutdown();
+ }
+
+ public static class Builder {
+ private final String host;
+ private final int port;
+ private final List<ChannelBuilder> channelBuilders;
+ private final List<ChannelDecorator> decorators;
+
+ private Builder(String host, int port) {
+ this.host = host;
+ this.port = port;
+ this.channelBuilders = new LinkedList<ChannelBuilder>();
+ this.decorators = new LinkedList<ChannelDecorator>();
+ }
+
+ public Builder addChannelDecorator(ChannelDecorator interceptor) {
+ this.decorators.add(interceptor);
+ return this;
+ }
+
+ public GRPCChannel build() throws Exception {
+ return new GRPCChannel(host, port, channelBuilders, decorators);
+ }
+
+ public Builder addManagedChannelBuilder(ChannelBuilder builder) {
+ channelBuilders.add(builder);
+ return this;
+ }
+ }
+}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
index 785bc0d..731fa3a 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/GRPCChannelManager.java
@@ -16,15 +16,11 @@
*
*/
-
package org.apache.skywalking.apm.agent.core.remote;
-import io.grpc.ManagedChannel;
+import io.grpc.Channel;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
-import io.grpc.internal.DnsNameResolverProvider;
-import io.grpc.netty.NettyChannelBuilder;
-
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
@@ -32,22 +28,21 @@ import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.DefaultNamedThreadFactory;
+import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.agent.core.conf.RemoteDownstreamConfig;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
-import org.apache.skywalking.apm.agent.core.conf.Config;
import org.apache.skywalking.apm.util.RunnableWithExceptionProtection;
/**
- * @author wusheng
+ * @author wusheng, zhang xin
*/
public class GRPCChannelManager implements BootService, Runnable {
private static final ILog logger = LogManager.getLogger(GRPCChannelManager.class);
- private volatile ManagedChannel managedChannel = null;
+ private volatile GRPCChannel managedChannel = null;
private volatile ScheduledFuture<?> connectCheckFuture;
private volatile boolean reconnect = true;
private Random random = new Random();
@@ -61,13 +56,13 @@ public class GRPCChannelManager implements BootService, Runnable {
@Override
public void boot() throws Throwable {
connectCheckFuture = Executors
- .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
- .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
- @Override
- public void handle(Throwable t) {
- logger.error("unexpected exception.", t);
- }
- }), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
+ .newSingleThreadScheduledExecutor(new DefaultNamedThreadFactory("GRPCChannelManager"))
+ .scheduleAtFixedRate(new RunnableWithExceptionProtection(this, new RunnableWithExceptionProtection.CallbackWhenException() {
+ @Override
+ public void handle(Throwable t) {
+ logger.error("unexpected exception.", t);
+ }
+ }), 0, Config.Collector.GRPC_CHANNEL_CHECK_INTERVAL, TimeUnit.SECONDS);
}
@Override
@@ -94,14 +89,13 @@ public class GRPCChannelManager implements BootService, Runnable {
int index = Math.abs(random.nextInt()) % RemoteDownstreamConfig.Collector.GRPC_SERVERS.size();
server = RemoteDownstreamConfig.Collector.GRPC_SERVERS.get(index);
String[] ipAndPort = server.split(":");
- NettyChannelBuilder channelBuilder =
- new TLSChannelBuilder(
- NettyChannelBuilder.forAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
- .nameResolverFactory(new DnsNameResolverProvider())
- .maxInboundMessageSize(1024 * 1024 * 50)
- .usePlaintext(true)
- ).buildTLS();
- managedChannel = channelBuilder.build();
+
+ managedChannel = GRPCChannel.newBuilder(ipAndPort[0], Integer.parseInt(ipAndPort[1]))
+ .addManagedChannelBuilder(new StandardChannelBuilder())
+ .addManagedChannelBuilder(new TLSChannelBuilder())
+ .addChannelDecorator(new AuthenticationDecorator())
+ .build();
+
if (!managedChannel.isShutdown() && !managedChannel.isTerminated()) {
reconnect = false;
notify(GRPCChannelStatus.CONNECTED);
@@ -123,8 +117,8 @@ public class GRPCChannelManager implements BootService, Runnable {
listeners.add(listener);
}
- public ManagedChannel getManagedChannel() {
- return managedChannel;
+ public Channel getChannel() {
+ return managedChannel.getChannel();
}
/**
@@ -150,13 +144,13 @@ public class GRPCChannelManager implements BootService, Runnable {
private boolean isNetworkError(Throwable throwable) {
if (throwable instanceof StatusRuntimeException) {
- StatusRuntimeException statusRuntimeException = (StatusRuntimeException) throwable;
+ StatusRuntimeException statusRuntimeException = (StatusRuntimeException)throwable;
return statusEquals(statusRuntimeException.getStatus(),
- Status.UNAVAILABLE,
- Status.PERMISSION_DENIED,
- Status.UNAUTHENTICATED,
- Status.RESOURCE_EXHAUSTED,
- Status.UNKNOWN
+ Status.UNAVAILABLE,
+ Status.PERMISSION_DENIED,
+ Status.UNAUTHENTICATED,
+ Status.RESOURCE_EXHAUSTED,
+ Status.UNKNOWN
);
}
return false;
diff --git a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/StandardChannelBuilder.java
similarity index 55%
copy from apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java
copy to apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/StandardChannelBuilder.java
index d633866..814d8d4 100644
--- a/apm-collector/apm-collector-grpc-manager/collector-grpc-manager-define/src/main/java/org/apache/skywalking/apm/collector/grpc/manager/service/GRPCManagerService.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/StandardChannelBuilder.java
@@ -16,19 +16,21 @@
*
*/
+package org.apache.skywalking.apm.agent.core.remote;
-package org.apache.skywalking.apm.collector.grpc.manager.service;
-
-import org.apache.skywalking.apm.collector.core.module.Service;
-import org.apache.skywalking.apm.collector.server.Server;
-
-import java.io.File;
+import io.grpc.ManagedChannelBuilder;
+import io.grpc.internal.DnsNameResolverProvider;
/**
- * @author peng-yongsheng, wusheng
+ * @author zhang xin
*/
-public interface GRPCManagerService extends Service {
- Server createIfAbsent(String host, int port) throws ServerCanNotBeCreatedException;
+public class StandardChannelBuilder implements ChannelBuilder {
+ private final static int MAX_INBOUND_MESSAGE_SIZE = 1024 * 1024 * 50;
+ private final static boolean USE_PLAIN_TEXT = true;
- Server createIfAbsent(String host, int port, File certChainFile, File privateKeyFile) throws ServerCanNotBeCreatedException;
+ @Override public ManagedChannelBuilder build(ManagedChannelBuilder managedChannelBuilder) throws Exception {
+ return managedChannelBuilder.nameResolverFactory(new DnsNameResolverProvider())
+ .maxInboundMessageSize(MAX_INBOUND_MESSAGE_SIZE)
+ .usePlaintext(USE_PLAIN_TEXT);
+ }
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TLSChannelBuilder.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TLSChannelBuilder.java
index ce4569d..5dacdf4 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TLSChannelBuilder.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TLSChannelBuilder.java
@@ -22,42 +22,29 @@ import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import io.netty.handler.ssl.SslContextBuilder;
+import java.io.File;
+import javax.net.ssl.SSLException;
import org.apache.skywalking.apm.agent.core.boot.AgentPackageNotFoundException;
import org.apache.skywalking.apm.agent.core.boot.AgentPackagePath;
import org.apache.skywalking.apm.agent.core.conf.Constants;
-import javax.net.ssl.SSLException;
-import java.io.File;
-
/**
* Detect the `/ca` folder in agent package, if `ca.crt` exists, start TLS (no mutual auth).
*
* @author wusheng
*/
-public class TLSChannelBuilder {
+public class TLSChannelBuilder implements ChannelBuilder<NettyChannelBuilder> {
private static String CA_FILE_NAME = "ca" + Constants.PATH_SEPARATOR + "ca.crt";
- private NettyChannelBuilder nettyChannelBuilder;
-
- public TLSChannelBuilder(NettyChannelBuilder nettyChannelBuilder) {
- this.nettyChannelBuilder = nettyChannelBuilder;
- }
-
- /**
- * Build a TLS supported channel is necessary.
- *
- * @return chanel builder
- * @throws AgentPackageNotFoundException
- * @throws SSLException
- */
- NettyChannelBuilder buildTLS() throws AgentPackageNotFoundException, SSLException {
+ @Override public NettyChannelBuilder build(
+ NettyChannelBuilder managedChannelBuilder) throws AgentPackageNotFoundException, SSLException {
File caFile = new File(AgentPackagePath.getPath(), CA_FILE_NAME);
if (caFile.exists() && caFile.isFile()) {
SslContextBuilder builder = GrpcSslContexts.forClient();
builder.trustManager(caFile);
- nettyChannelBuilder = nettyChannelBuilder.negotiationType(NegotiationType.TLS)
- .sslContext(builder.build());
+ managedChannelBuilder = managedChannelBuilder.negotiationType(NegotiationType.TLS)
+ .sslContext(builder.build());
}
- return nettyChannelBuilder;
+ return managedChannelBuilder;
}
}
diff --git a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
index 4accfa0..9399ffd 100644
--- a/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
+++ b/apm-sniffer/apm-agent-core/src/main/java/org/apache/skywalking/apm/agent/core/remote/TraceSegmentServiceClient.java
@@ -19,23 +19,24 @@
package org.apache.skywalking.apm.agent.core.remote;
-import io.grpc.ManagedChannel;
+import io.grpc.Channel;
import io.grpc.stub.StreamObserver;
-import java.util.List;
-import org.apache.skywalking.apm.agent.core.context.TracingContext;
-import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
import org.apache.skywalking.apm.agent.core.boot.BootService;
import org.apache.skywalking.apm.agent.core.boot.ServiceManager;
+import org.apache.skywalking.apm.agent.core.context.TracingContext;
import org.apache.skywalking.apm.agent.core.context.TracingContextListener;
import org.apache.skywalking.apm.agent.core.context.trace.TraceSegment;
-import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
-import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.agent.core.logging.api.ILog;
import org.apache.skywalking.apm.agent.core.logging.api.LogManager;
+import org.apache.skywalking.apm.commons.datacarrier.DataCarrier;
+import org.apache.skywalking.apm.commons.datacarrier.buffer.BufferStrategy;
+import org.apache.skywalking.apm.commons.datacarrier.consumer.IConsumer;
import org.apache.skywalking.apm.network.proto.Downstream;
import org.apache.skywalking.apm.network.proto.TraceSegmentServiceGrpc;
import org.apache.skywalking.apm.network.proto.UpstreamSegment;
+import java.util.List;
+
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.BUFFER_SIZE;
import static org.apache.skywalking.apm.agent.core.conf.Config.Buffer.CHANNEL_SIZE;
import static org.apache.skywalking.apm.agent.core.remote.GRPCChannelStatus.CONNECTED;
@@ -169,7 +170,7 @@ public class TraceSegmentServiceClient implements BootService, IConsumer<TraceSe
@Override
public void statusChanged(GRPCChannelStatus status) {
if (CONNECTED.equals(status)) {
- ManagedChannel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getManagedChannel();
+ Channel channel = ServiceManager.INSTANCE.findService(GRPCChannelManager.class).getChannel();
serviceStub = TraceSegmentServiceGrpc.newStub(channel);
}
this.status = status;
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index cc0d27a..24f1ec9 100644
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -21,6 +21,9 @@ agent.application_code=Your_ApplicationName
# Negative number means sample traces as many as possible, most likely 100%
# agent.sample_n_per_3_secs=-1
+# Authentication active is based on backend setting, see application.yml for more details.
+# agent.authentication = xxxx
+
# The max amount of spans in a single segment.
# Through this config item, skywalking keep your application memory cost estimated.
# agent.span_limit_per_segment=300
--
To stop receiving notification emails like this one, please contact
zhangxin@apache.org.