You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/01/29 12:08:45 UTC

[GitHub] [pulsar] xiaotongwang1 opened a new issue #9367: Some changes we have made with pulsar (include some security improve)

xiaotongwang1 opened a new issue #9367:
URL: https://github.com/apache/pulsar/issues/9367


   我最近针对pulsar开展项目预研,主要从安全加固方面做了一些分析并做了一些整改以满足一些基本安全要求。
   Recently, I conducted a project pre-research for Pulsar, and analyzed security hardening and made some improvements to meet some basic security requirements.
   
   当前已经在项目组内部完成基本测试,为了方便我们后续的版本能够跟随开源社区版本演进,我们期望社区commiter or PMC帮忙审核下,如果社区能够接纳我们修改的功能点,我将向公司提出申请,提交下面功能源码到开源社区
   Basic tests have been completed in the project team. To facilitate future versions to evolve with the open source community version, we expect the community committer or PMC to review the changes. If the community accepts the modified functions, I will submit an application to company and submit the following function source code to the open source community.
   
   # 侵入式修改/Intrusive modification</a>
   
   
   ## 打包过程显示指定配置文件unix模式/Displaying the Unix Mode of a Specified Configuration File During Packaging</a>
   
   **文件路径:**/distribution/server/src/assemble/bin.xml
   
   **修改说明:**
   
   <lineEnding\>unix</lineEnding\>
   
   指定unix,避免windows打包后部署linux上运行失败
   Specify UNIX to prevent running failures on Linux after Windows is packed.
   
   ## 所有zookeeper访问操作java文件(安全相关)/Java files for all ZooKeeper access operations (security-related)</a>
   
   **文件路径:** 太多了,不一一列举,可以搜索Ids.OPEN\_ACL\_UNSAFE /You can search for Ids.OPEN\_ACL\_UNSAFE.
   
   **修改说明:**
   
   示例:
   
   原始: zkc.create\(path + "/ledgers", new byte\[0\], Ids.OPEN\_ACL\_UNSAFE, CreateMode.PERSISTENT\);
   
   修改后:zkc.create\(path +"/ledgers/available", new byte\[0\], Ids.CREATOR\_ALL\_ACL, CreateMode.PERSISTENT\);
   
   Todo:平滑升级,需要在额外加个System properties开关
   Todo:  Smooth upgrade requires additional system properties.
   
   ## 自动获取本地IP作为advertisedAddress/Automatically obtain the local IP address as the advertisedAddress.</a>
   
   **文件路径:**pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
   
   **修改说明:**
   
   由于没有内网DNS解析器,设置advertisedAddress为IP
   Because there is no intranet DNS resolver, set advertisedAddress to the IP address.
   
   ```
         public String getAdvertisedAddress() {
             try {
               if ("ip".equals(advertisedAddress)) {
                   return getLocalIp();
               }
               return advertisedAddress;
             } catch (Exception e) {
               return advertisedAddress;
             }
         }
   
   ```
   ## Prometheus客户端IP白名单限制(安全建议)+支持json格式/Prometheus client IP address trustlist restriction (security suggestion) + JSON format supported</a>
   
   **文件路径:**
   
   pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
   
   pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/prometheus/PrometheusMetricsServlet.java
   
   **修改说明:**
   
   Prometheus 访问的IP限制(安全改进,虽说都是metrics日志,从安全角度,建议非合法客户端访问失败)
   IP address restriction for Prometheus access (Security improvement. Although metric logs are used, it is recommended that unauthorized clients fail to access the Prometheus.)
   
   Metrics servlet接口支持返回JSON格式,方便除了Prometheus系统外的其他监控、统计系统更好计算
   The Metrics servlet interface can return the result in JSON format, which facilitates calculation by other monitoring and statistics systems except Prometheus.
   
   示例:
   ```
                   // ip limit
                   if (limitClientIps != null && !limitClientIps.isEmpty()) {
                       String clientIp = getRemoteAddr(request);
                       if (StringUtils.isEmpty(clientIp) || !limitClientIps.contains(clientIp)) {
                           log.warn("clientIp {} not in the limitClientIps {} ", clientIp, limitClientIps);
   
                           res.setStatus(HttpStatus.FORBIDDEN_403);
                           res.setContentType("text/plain");
                           res.getOutputStream().write("illegal access ip".getBytes());
                           context.complete();
                           return;
                       }
                   }
   
   //...............
   
           if (!jsonFormat) {
   
               metricType(stream, name);
               stream.write(name).write("{cluster=\"").write(cluster).write("\", namespace=\"").write(namespace)
                       .write("\",topic=\"").write(topic).write("\",subscription=\"").write(subscription)
                       .write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
                       .write("\"} ");
               stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
   
           } else {
               JSONObject json = new JSONObject();
               json.put("plugin_id", "broker");
               json.put("ip", LOCAL_IP);
               json.put("type", "gauge");
               json.put(name, value);
               json.put("timestamp", System.currentTimeMillis());
               json.put("cluster", cluster);
   
               JSONObject jsonMetrics = new JSONObject();
               jsonMetrics.put("namespace", namespace);
               jsonMetrics.put("topic", topic);
               jsonMetrics.put("subscription", subscription);
               jsonMetrics.put("consumer_name", consumerName);
               jsonMetrics.put("consumer_id", consumerId);
               json.put("metric", jsonMetrics);
               stream.write(json.toString());
               stream.write('\n');
           }
   ```
   ## 支持Bookkeeper Client可配置zookeeper session timeout/The ZooKeeper session timeout can be configured on the Bookkeeper client.</a>
   
   **文件路径:**/pulsar-broker/src/main/java/org/apache/pulsar/broker/BookKeeperClientFactoryImpl.java
   
   **修改说明:**
   ```
   +          bkConf.setZkTimeout((int)conf.getZooKeeperSessionTimeoutMillis());
   ```
   
   ## Prometheus统计zookeeper链接数建议加上安全连接数/Prometheus measures the number of ZooKeeper connections. You are advised to add the number of secure connections to the number of ZooKeeper connections.</a>
   
   **文件路径:**/pulsar-zookeeper/src/main/java/org/apache/pulsar/zookeeper/ZooKeeperServerAspect.java
   
   **修改说明:**
   
   当前zookeeper开启安全模式时,链接数需要加上安全链接数
   When the security mode is enabled for the current ZooKeeper, the number of secure connections must be added to the number of secure connections.
   
   ```
           Gauge.build().name("zookeeper_server_connections").help("Number of currently opened connections").create()
                   .setChild(new Gauge.Child() {
                       @Override
                       public double get() {
                           int connections = -1;
       
                           ServerCnxnFactory cnxFactory = zkServer.getServerCnxnFactory();
                           if (cnxFactory != null) {
                               connections += cnxFactory.getNumAliveConnections();
                           }
       
                           ServerCnxnFactory secCnxFactory = zkServer.getSecureServerCnxnFactory();
                           if (secCnxFactory != null) {
                               connections += secCnxFactory.getNumAliveConnections();
                           }
                           return connections;
                       }
                   }).register();
   
   ```
   # 非侵入式修改/Non-intrusive modification</a>
   
   ## Zookeeper安全加固/ZooKeeper Security Hardening</a>
   
   **加固说明:**
   
   1.  client-server之间开启SASL/DIGEST认证
   2.  server-server之间开启SASL/DIGEST认证
   3.  初始化时加固默认路径
   4.  敏感信息加密\(业务自己实现加密解密类)
   5、增加Prometheus监听ip限制,类似broker,加ip白名单
   
   **Security Hardening Description:**
   1. SASL/DIGEST authentication is enabled between the client and server.
   2. SASL/DIGEST authentication is enabled between servers.
   3. Harden the default path during initialization.
   4. Sensitive information encryption/ (encryption/decryption class implemented by the service side)
   5. Add the Prometheus listening IP address restriction, similar to the broker, and add the IP address trustlist.
   
   为什么不采用SASL/Kerberos?
   
   zookeeper SASL/kerberos需要引入KDC,增加系统组网复杂度,此外常用的MIT kerberos受美国EAR出口管制,heimdal kerberos功能测试可用,只是商用案例少
   
   可是SASL/DIGEST-MD5,由于DIGEST签名算法不安全,目前建议开启TLS通道加密,等待zookeeper新版本支持可插拔认证后,再做二次开发(https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
   
   
   Why not SASL/Kerberos?
   
   KDC is required for ZooKeeper SASL/kerberos to increase the system networking complexity. In addition, common MIT Kerberos are subject to export control of the US EAR. The function of the kerberos is available in the test, but there are few commercial cases.
   
   However, the SASL/DIGEST-MD5 signature algorithm is insecure. Therefore, you are advised to enable TLS channel encryption. After the new ZooKeeper version supports pluggable authentication, perform secondary development.https://issues.apache.org/jira/browse/ZOOKEEPER-2159)
   
   
   **修改示例:**
   
   1.  环境变量
   ```
       -Djava.security.auth.login.config=xxxx.jaas.conf
   
       -Dzookeeper.serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory
   
       -Dzookeeper.ssl.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
   
       -Dzookeeper.ssl.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
   
       -Dzookeeper.ssl.protocol=TLSv1.2
   
       -Dzookeeper.ssl.clientAuth=none
   
       -Dzookeeper.ssl.hostnameVerification=false -Dzookeeper.ssl.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
   
       -Dzookeeper.sslQuorum=true
   
       -Dzookeeper.ssl.quorum.keyStore.location=$\{ZOOBINDIR\}/../keystore/zookeeper.jks
   
       -Dzookeeper.ssl.quorum.trustStore.location=$\{ZOOBINDIR\}/../keystore/truststore.jks
   
       -Dzookeeper.ssl.quorum.protocol=TLSv1.2
   
       -Dzookeeper.ssl.quorum.clientAuth=none
   
       -Dzookeeper.ssl.quorum.hostnameVerification=false
   
       -Dzookeeper.ssl.quorum.ciphersuites=TLS\_ECDHE\_ECDSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_ECDSA\_WITH\_AES\_256\_GCM\_SHA384,TLS\_ECDHE\_RSA\_WITH\_AES\_128\_GCM\_SHA256,TLS\_ECDHE\_RSA\_WITH\_AES\_256\_GCM\_SHA384
   ```
   2.  zoo.conf
   ```
       authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
   
       requireClientAuthScheme=sasl
   
       jaasLoginRenew=3600000
   
       admin.enableServer=false
   
       quorum.auth.enableSasl=true
   
       quorum.auth.learnerRequireSasl=true
   
       quorum.auth.serverRequireSasl=true
   
       quorum.auth.learner.loginContext=QuorumLearner
   
       quorum.auth.server.loginContext=QuorumServer
   
       quorum.cnxn.threads.size=20
   
       4lw.commands.whitelist==stat,ruok,mntr,stat
   
       clientPortAddress=127.0.0.1
   ```
   3.  jaas.conf
   ```
       Server \{
   
       xxx.DigestLoginModule required
   
       username="admin"
   
       password="xxx"
   
       user\_admin="xxx";
   
       \};
   
       Client \{
   
       xxx.DigestLoginModule required
   
       username="admin"
   
       password="xxx";
   
       \};
   
       QuorumServer \{
   
       xxx.DigestLoginModule required
   
       username="Quorum"
   
       password="xxx"
   
       user\_Quorum="xxx";
   
       \};
   
       QuorumLearner \{
   
       xxx.DigestLoginModule required
   
       username="Quorum"
   
       password="xxx";
   
       \};
   ```
   4.  初始化脚本
   ```
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/config sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper/quota sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl /zookeeper sasl:admin:cdrwa,world:anyone:r
   
       sh zkCli.sh -server 127.0.0.1:2181 setAcl / sasl:admin:cdrwa,world:anyone:r
   ```
   
   ## Broker安全加固/Broker Security Hardening</a>
   
   **加固说明:**
   
   1.  新增认证模块:
       1.  TCP采用SASL/SCRAM-SHA256(我们将salt、storekey、serverkey、iterations加密后保存到zookeeper )
       2.  HTTP 管理接口采用OAuth2.0 token模式/或者签名模式(HTTP目前好像不支持一次认证分多次情况,因此可以采用)
    
   **Security Hardening Description:**
   1.  Added the following authentication module::
      1. The TCP uses SASL/SCRAM-SHA256. (The salt, storekey, serverkey, and iterations are encrypted and saved to the ZooKeeper.)
      2. The HTTP management interface uses the OAuth2.0 token mode or signature mode. (Currently, HTTP does not support multiple authentications. Therefore, the OAuth2.0 token mode or signature mode can be used.)
   
   为什么要使用SASL/SCRAM?
   
   我们基于以下几点考虑:
   
   1、尽量不要引入额外的组件,导致组网变复杂
   
   2、尽量保证在不开启TLS的时候,尽可能保证认证过程安全
   
   因此我们排除了kerberos、oauth2 token等
   
   Why SASL/SCRAM?
   
   We consider the following points:
   
   1. Do not introduce extra components to complicate the networking.
   
   2. Ensure that the authentication process is secure when TLS is disabled.
   
   Therefore, we exclude kerberos, oauth2 tokens, and so on.
   
   
   **修改示例:**
   
   **SASL/SCRAM-SHA256示例代码:**
   ```
   class: AuthenticationStateSCRAM implements AuthenticationState
        @Override
       public AuthData authenticate(AuthData authData) throws AuthenticationException {
   
           String clientAddress;
   
           if (!authenticationDataSource.hasDataFromPeer()) {
               throw new AuthenticationException("pulsar server scrame auth does not have a client address ");
           }
   
           SocketAddress socketAddress = authenticationDataSource.getPeerAddress();
           clientAddress = socketAddress.toString();
   
           if (ScramStage.INIT.equals(stage)) {
   			//....
               return AuthData.of(serverFirstMessage.getBytes(StandardCharsets.UTF_8));
   
           } else if (ScramStage.FIRST.equals(stage)) {
    
   			//....
               return AuthData.of(serverFinalMessage.getBytes(StandardCharsets.UTF_8));
    
   
           } else if (ScramStage.FINAL.equals(stage)) {
               stage = ScramStage.FINISH;
               LOG.info("pulsar server scrame auth finish with success");
               return null;
           } else {
               LOG.error("pulsar server scrame auth error with invalid stage");
               throw new AuthenticationException(
                   "Authentication datasource stage is invalid: " + stage + ", from client: " + credential);
           }
       }
   ```
   
   
   **HTTP Admin Request示例代码:**
   
   
   ```
   class :AuthenticationProviderSCRAM implements AuthenticationProvider
   
   @Override
       @Override
       public String authenticate(AuthenticationDataSource authDataSource) throws AuthenticationException {
   
           if (!authDataSource.hasDataFromHttp()) {
               throw new AuthenticationException("Authentication failed: not a HTTP request");
           }
   
           String clientAddress = authDataSource.getPeerAddress().toString();
   
           String httpHeaderValue = authDataSource.getHttpHeader(HTTP_REQ_HEADER_NAME);
           if ((httpHeaderValue == null) || !httpHeaderValue.startsWith(HTTP_REQ_HEADER_VALUE_PREFIX)) {
               throw new AuthenticationException(
                   "Authentication datasource have invalid HTTP Authorization header: " + clientAddress);
           }
           String token = httpHeaderValue.substring(HTTP_REQ_HEADER_VALUE_PREFIX.length());
           if (token.isEmpty()) {
               throw new AuthenticationException(
                   "Authentication datasource does not have a client message: " + clientAddress);
           }
   
           return validateToken(token);
       }
   	
   ```
   
   ## Bookkeeper安全加固</a>
   
   **加固说明:**
   
   1.  新增认证模块:SASL/SCRAM-SHA256  / New authentication module: SASL/SCRAM-SHA256
   
   2、metrics 接口设置监听IP、Client IP白名单、支持JSON风格 /Set the listening IP address and client IP address trustlist for the metrics interface, and support the JSON style.
   
   
   
   **修改示例:**
   
   **SASL/SCRAM-SHA256示例代码:**
   
   ```
   class SASLBookieAuthProvider implements BookieAuthProvider
   
       @Override
       public void process(AuthToken authData, AuthCallbacks.GenericCallback<AuthToken> cb) {
           try {
               // 处理客户端首次认证请求
               if (ScramStage.INIT.equals(stage)) {
   				//....
   
                   cb.operationComplete(BKException.Code.OK,
                       AuthToken.wrap(serverFirstMessage.getBytes(StandardCharsets.UTF_8)));
   
               }
               // 校验客户端签名,并返回服务端签名
               else if (ScramStage.FIRST.equals(stage)) {
   
   				//....
   
                   cb.operationComplete(BKException.Code.OK,
                       AuthToken.wrap(serverFinalMessage.getBytes(StandardCharsets.UTF_8)));
   
               }
               // 处理成功
               else if (ScramStage.FINAL.equals(stage)) {
                   stage = ScramStage.FINISH;
   
                   LOG.info("ScramStage.FINAL:bookie server sasl scrame handle {}", credential);
                   completeCb.operationComplete(BKException.Code.OK, null);
               } else {
                   LOG.info("ScramStage.UNKNOW:bookie server sasl scrame handle failed {}", credential);
                   cb.operationComplete(BKException.Code.UnauthorizedAccessException,
                       AuthToken.wrap("error=unknowStage".getBytes(StandardCharsets.UTF_8)));
                   completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
               }
   
           } catch (Exception err) {
               LOG.error("bookie server sasl scrame handle failed {}", err);
               cb.operationComplete(BKException.Code.UnauthorizedAccessException,
                   AuthToken.wrap(("error=" + err.getMessage()).getBytes(StandardCharsets.UTF_8)));
               completeCb.operationComplete(BKException.Code.UnauthorizedAccessException, null);
           }
       }
   	
   ```
   
   **Metrics示例代码:**
   ```
       @Override
       public void start(Configuration conf) {
           boolean httpEnabled = conf.getBoolean(PROMETHEUS_STATS_HTTP_ENABLE, DEFAULT_PROMETHEUS_STATS_HTTP_ENABLE);
           boolean bkHttpServerEnabled = conf.getBoolean("httpServerEnabled", false);
           // only start its own http server when prometheus http is enabled and bk http server is not enabled.
           if (httpEnabled && !bkHttpServerEnabled) {
               int httpPort = conf.getInt(PROMETHEUS_STATS_HTTP_PORT, DEFAULT_PROMETHEUS_STATS_HTTP_PORT);
               String httpHost = conf.getString(PROMETHEUS_STATS_HTTP_HOST, "0.0.0.0");
               InetSocketAddress httpEndpoint = InetSocketAddress.createUnresolved(httpHost, httpPort);
               this.server = new Server(httpEndpoint);
               ServletContextHandler context = new ServletContextHandler();
               context.setContextPath("/");
               server.setHandler(context);
   
               context.addServlet(new ServletHolder(new PrometheusServlet(this, false)), "/metrics");
               context.addServlet(new ServletHolder(new PrometheusServlet(this, true)), "/metricsjson");
   
               try {
                   server.start();
                   LOG.info("Started Prometheus stats endpoint at {}", httpEndpoint);
               } catch (Exception e) {
                   throw new RuntimeException(e);
               }
           }
   ```
   ## 其他一些工具类:/Other tool classes:</a>
   
   SCRAM签名、验签算法、敏感信息加密解密AES-GCM算法、curator watch zookeeper sasl用户信息(salt、storekey、serverkey、iterations)工具
   
   SCRAM signature, signature verification algorithm, sensitive information encryption and decryption AES-GCM algorithm, curator watch ZooKeeper sasl user information (salt, storekey, serverkey, and iterations) tool
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #9367: Some changes we have made with pulsar (include some security improve)

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #9367:
URL: https://github.com/apache/pulsar/issues/9367#issuecomment-1058893824


   The issue had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on issue #9367: Some changes we have made with pulsar (include some security improve)

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on issue #9367:
URL: https://github.com/apache/pulsar/issues/9367#issuecomment-770219946


   @xiaotongwang1 Thanks for the detailed description. A better way to advance these fixes is to fix them one by one. Seems there are almost 10 issues that you have mentioned. It's better to start a new issue for each one to discuss in the community. Some of them might introduce compatibility issues, that should start a discussion in the dev email thread(dev@pulsar.apache.org), And if it is a bookkeeper related issue, it should be tracked in the bookkeeper repo. Is it works for you?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org