You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2020/07/20 01:57:52 UTC

[GitHub] [rocketmq] androidkaifa1 opened a new issue #2170: Concurrency security issues

androidkaifa1 opened a new issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170


    public List<RegisterBrokerResult> registerBrokerAll(
           final String clusterName,
           final String brokerAddr,
           final String brokerName,
           final long brokerId,
           final String haServerAddr,
           final TopicConfigSerializeWrapper topicConfigWrapper,
           final List<String> filterServerList,
           final boolean oneway,
           final int timeoutMills,
           final boolean compressed) {
   
           // 初始化一个List,存放每个NameServer注册结果的
          <font color="red"> // 多线程 会有并发问题吧
           final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
          </font>
           // 获取 NameServer 地址列表
           List<String> nameServerAddressList =
                   this.remotingClient.getNameServerAddressList();
           if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
   
               // 构建请求头,在请求头里面放很多的信息,比如说 BrokerId 和 BrokerName
               final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
               requestHeader.setBrokerAddr(brokerAddr);
               requestHeader.setBrokerId(brokerId);
               requestHeader.setBrokerName(brokerName);
               requestHeader.setClusterName(clusterName);
               requestHeader.setHaServerAddr(haServerAddr);
               requestHeader.setCompressed(compressed);
   
               // 构建请求体,包含一些配置
               RegisterBrokerBody requestBody = new RegisterBrokerBody();
               requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
               requestBody.setFilterServerList(filterServerList);
               final byte[] body = requestBody.encode(compressed);
               final int bodyCrc32 = UtilAll.crc32(body);
               requestHeader.setBodyCrc32(bodyCrc32);
               // 使用CountDownLatch同步计数器,保证注册完全部的 NameServer之后才往下走,
               // 执行其他逻辑
               final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
               // 遍历NameServer 地址列表,使用线程池去注册
               for (final String namesrvAddr : nameServerAddressList) {
                   brokerOuterExecutor.execute(new Runnable() {
                       @Override
                       public void run() {
                           try {
                               // 调用 registerBroker 真正执行注册
                               RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);
                               **if (result != null) {
                                   // 注册成功结果放到一个List里去
                                   registerBrokerResultList.add(result);
                               }**
   
                               log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);
                           } catch (Exception e) {
                               log.warn("registerBroker Exception, {}", namesrvAddr, e);
                           } finally {
                               // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                               countDownLatch.countDown();
                           }
                       }
                   });
               }
   
               try {
                   // 等待所有 NameServer 都注册完,才返回注册结果
                   countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
               } catch (InterruptedException e) {
               }
           }
   
           return registerBrokerResultList;
       }


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] francisoliverlee commented on issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
francisoliverlee commented on issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170#issuecomment-662186366


   @androidkaifa1 
   in RocketMQ4.7.0, the methods call registerBrokerAll() are marked as synchronized finally,
   those methods in BrokerController as bellow:
   public synchronized void registerIncrementBrokerData()
   public synchronized void registerBrokerAll ()


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] androidkaifa1 commented on issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
androidkaifa1 commented on issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170#issuecomment-662198516


   @francisoliverlee 
   Thanks for your answer, I know what you mean. But `public synchronized void registerBrokerAll()` This code just makes the following code
   
   ```java
    // 启动了一个定时调度的任务,让他去给 NameServer 进行注册
           // 默认 30s 发送一次
           this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
   
               @Override
               public void run() {
                   try {
                       // 将自己注册到NameServer
                       BrokerController.this.
                               registerBrokerAll(true, false, brokerConfig.isForceRegister());
                   } catch (Throwable e) {
                       log.error("registerBrokerAll Exception", e);
                   }
               }
           }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
   
   ```
   Thread safety in the thread pool. In other words, he only guarantees the thread safety of different batches of timing tasks. However, it is still not possible to guarantee the thread safety of timed tasks in the same batch. For example: send a heartbeat a certain time. Broker needs to send heartbeats to 10 NameServers. `public List<RegisterBrokerResult> registerBrokerAll`The method is to open 10 threads to send heartbeats, if the results are returned at the same time, modify `registerBrokerResultList ArrayList` There are still concurrency security issues.
   
   `public synchronized void registerBrokerAll()` This code just guarantees, for example, time 0 and time 30. The heartbeats they send have no concurrent security issues.
   
   The logic in MQ is similar to the following code logic.
   
   ```java
   package com.example.test.arraylistrocketmq;
   
   import com.google.common.collect.Lists;
   import org.junit.Test;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.ArrayBlockingQueue;
   import java.util.concurrent.CountDownLatch;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicInteger;
   
   /**
    * @author Chen
    * @version 1.0
    * @date 2020/7/20 9:20
    * @description:
    */
   public class TestChen2 {
   
   
       @Test
       public void test() throws InterruptedException {
           for (int i = 0; i < 100; i++) {
               int finalI = i;
               Thread thread = new Thread(new Runnable() {
                   @Override
                   public void run() {
                       deal(finalI);
                   }
               });
               thread.start();
   
               thread.join();
           }
       }
   
       public synchronized void deal(int index) {
           // 初始化一个List,存放每个NameServer注册结果的
           final List<RegisterBrokerResult> registerBrokerResultList =
                   Lists.newArrayList();
           // 获取 NameServer 地址列表
           List<String> nameServerAddressList = new ArrayList<>();
           for (int j = 0; j < 10; j++) {
               nameServerAddressList.add("192.168.0." + j);
           }
           AtomicInteger atomicInteger = new AtomicInteger(0);
           // 执行其他逻辑
           final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
   
           BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
                   new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
   
           // 遍历NameServer 地址列表,使用线程池去注册
           for (final String namesrvAddr : nameServerAddressList) {
               brokerOuterExecutor.execute(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           // 调用 registerBroker 真正执行注册
                           RegisterBrokerResult result = new RegisterBrokerResult();
                           result.setHaServerAddr("chen" + atomicInteger.getAndIncrement());
                           Thread.sleep(100);
                           if (result != null) {
                               // 注册成功结果放到一个List里去
                               registerBrokerResultList.add(result);
                           }
   
                       } catch (Exception e) {
                           System.out.println("-----------wei---------------> " + index);
                           System.out.println(e);
                           System.out.println("-----------wei---------------> " + index);
                       } finally {
                           // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                           countDownLatch.countDown();
                       }
                   }
               });
           }
   
           try {
               // 等待所有 NameServer 都注册完,才返回注册结果
               countDownLatch.await(1000000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
           }
   
           System.out.println(registerBrokerResultList.size());
   //        System.out.println(registerBrokerResultList);
           System.out.println("-----------chen---------------------------------------> ");
       }
   }
   
   ```
   
   But the above code cannot guarantee thread safety.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] androidkaifa1 commented on issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
androidkaifa1 commented on issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170#issuecomment-663380640


   @francisoliverlee 
   Thanks for your answer. I have fixed it. and the link is [ https://github.com/apache/rocketmq/pull/2203]( https://github.com/apache/rocketmq/pull/2203)
   
   ![image](https://user-images.githubusercontent.com/17943128/88368145-02a4ef80-cdc0-11ea-97f7-5cfb11fcc4e8.png)
   
   Here is My changes.Thank you!
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] francisoliverlee commented on issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
francisoliverlee commented on issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170#issuecomment-663337315


   registerBrokerResultList should be thread-safe,  nice of u to fix it


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] RongtongJin closed issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
RongtongJin closed issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [rocketmq] androidkaifa1 commented on issue #2170: Concurrency security issues

Posted by GitBox <gi...@apache.org>.
androidkaifa1 commented on issue #2170:
URL: https://github.com/apache/rocketmq/issues/2170#issuecomment-662197454


   @francisoliverlee 
   感谢您的回答,我知道您的意思。但是`public synchronized void registerBrokerAll()` 这段代码只是让下面这段代码
   
   ```java
    // 启动了一个定时调度的任务,让他去给 NameServer 进行注册
           // 默认 30s 发送一次
           this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
   
               @Override
               public void run() {
                   try {
                       // 将自己注册到NameServer
                       BrokerController.this.
                               registerBrokerAll(true, false, brokerConfig.isForceRegister());
                   } catch (Throwable e) {
                       log.error("registerBrokerAll Exception", e);
                   }
               }
           }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);
   
   ```
   线程池里面的线程安全。也就是说 ,他只是保证了定时任务不同批次任务的线程安全。但是仍然无法保证定时任务同一个批次内的线程安全。比如说:某次发送心跳。Broker需要向 10个NameServer 发送心跳。`public List<RegisterBrokerResult> registerBrokerAll`方法是开启10个线程去发送心跳,如果此时同时返回结果,同时修改`registerBrokerResultList`这个`ArrayList`仍然是有并发安全问题的。
   
   `public synchronized void registerBrokerAll()` 这段代码只是保证 比如说 时刻 0 和时刻 30 。他们发送的心跳没有并发安全问题。
   
   
   MQ里面的逻辑和下面的代码逻辑类似。
   
   ```java
   package com.example.test.arraylistrocketmq;
   
   import com.google.common.collect.Lists;
   import org.junit.Test;
   
   import java.util.ArrayList;
   import java.util.List;
   import java.util.concurrent.ArrayBlockingQueue;
   import java.util.concurrent.CountDownLatch;
   import java.util.concurrent.TimeUnit;
   import java.util.concurrent.atomic.AtomicInteger;
   
   /**
    * @author Chen
    * @version 1.0
    * @date 2020/7/20 9:20
    * @description:
    */
   public class TestChen2 {
   
   
       @Test
       public void test() throws InterruptedException {
           for (int i = 0; i < 100; i++) {
               int finalI = i;
               Thread thread = new Thread(new Runnable() {
                   @Override
                   public void run() {
                       deal(finalI);
                   }
               });
               thread.start();
   
               thread.join();
           }
       }
   
       public synchronized void deal(int index) {
           // 初始化一个List,存放每个NameServer注册结果的
           final List<RegisterBrokerResult> registerBrokerResultList =
                   Lists.newArrayList();
           // 获取 NameServer 地址列表
           List<String> nameServerAddressList = new ArrayList<>();
           for (int j = 0; j < 10; j++) {
               nameServerAddressList.add("192.168.0." + j);
           }
           AtomicInteger atomicInteger = new AtomicInteger(0);
           // 执行其他逻辑
           final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
   
           BrokerFixedThreadPoolExecutor brokerOuterExecutor = new BrokerFixedThreadPoolExecutor(4, 10, 1, TimeUnit.MINUTES,
                   new ArrayBlockingQueue<Runnable>(32), new ThreadFactoryImpl("brokerOutApi_thread_", true));
   
           // 遍历NameServer 地址列表,使用线程池去注册
           for (final String namesrvAddr : nameServerAddressList) {
               brokerOuterExecutor.execute(new Runnable() {
                   @Override
                   public void run() {
                       try {
                           // 调用 registerBroker 真正执行注册
                           RegisterBrokerResult result = new RegisterBrokerResult();
                           result.setHaServerAddr("chen" + atomicInteger.getAndIncrement());
                           Thread.sleep(100);
                           if (result != null) {
                               // 注册成功结果放到一个List里去
                               registerBrokerResultList.add(result);
                           }
   
                       } catch (Exception e) {
                           System.out.println("-----------wei---------------> " + index);
                           System.out.println(e);
                           System.out.println("-----------wei---------------> " + index);
                       } finally {
                           // 注册完,执行 countDownLatch.countDown(); 同步计数器 -1
                           countDownLatch.countDown();
                       }
                   }
               });
           }
   
           try {
               // 等待所有 NameServer 都注册完,才返回注册结果
               countDownLatch.await(1000000, TimeUnit.MILLISECONDS);
           } catch (InterruptedException e) {
           }
   
           System.out.println(registerBrokerResultList.size());
   //        System.out.println(registerBrokerResultList);
           System.out.println("-----------chen---------------------------------------> ");
       }
   }
   
   ```
   
   但是上面的代码无法保证线程安全。
   
   
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org