You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2021/06/05 03:09:03 UTC

[GitHub] [rocketmq] dragon-zhang edited a comment on pull request #2895: Stage Ordered and disordered comsumption support?

dragon-zhang edited a comment on pull request #2895:
URL: https://github.com/apache/rocketmq/pull/2895#issuecomment-847584711


   > it's a very nice PR to improve order-message. and i have 2 questions here:
   > ### Q1. how to define the first period and why ?
   > ### Q2. using order-message means need comsume message orderly, and orderly do NEXT-STEP withconsume-results, like CRUD in mysql. in one period, how to make sure NEXT-STEP orderly?
   
   Thank you for your question. I'm very happy that you understand the meaning of this PR.
   
   About Q1, it depends on the user's own definition, and is free to define which stages and how large they are, for example in `org.apache.rocketmq.client.impl.consumer.ConsumeMessagePeriodicConcurrentlyServiceTest#test05MessageListenerOrderlyToConcurrently`, I've defined 101 stages with sizes of 1, 2, 3......98, 99, 100 and infinity.
   
   About Q2, I wrote a simple example to answer your question:
   ```java
   import java.util.ArrayList;
   import java.util.Collection;
   import java.util.List;
   import java.util.Queue;
   import java.util.concurrent.CompletableFuture;
   import java.util.concurrent.ConcurrentLinkedQueue;
   import java.util.concurrent.ConcurrentNavigableMap;
   import java.util.concurrent.ConcurrentSkipListMap;
   import java.util.concurrent.ExecutorService;
   import java.util.concurrent.Executors;
   import java.util.concurrent.atomic.AtomicInteger;
   
   public class Example {
       public static class PriorityConcurrentEngine {
   
           /**
            * highest priority
            */
           public static final Integer MAX_PRIORITY = Integer.MIN_VALUE;
   
           /**
            * lowest priority
            */
           public static final Integer MIN_PRIORITY = Integer.MAX_VALUE;
   
           private final ExecutorService executor = Executors.newFixedThreadPool(32);
   
           private final AtomicInteger currentStage = new AtomicInteger(0);
   
           /**
            * You can also use other class here, such as {@code ConcurrentNavigableMap<Integer, Queue<Callable>>}
            */
           private final ConcurrentNavigableMap<Integer, Queue<Runnable>> priorityTasks = new ConcurrentSkipListMap<>();
   
           private volatile boolean run = true;
   
           private final Thread consumer = new Thread(() -> {
               while (run) {
                   invokeAllNow();
               }
           });
   
           public static <E> List<E> pollAllTask(Queue<E> tasks) {
               List<E> list = new ArrayList<>();
               while (tasks != null && !tasks.isEmpty()) {
                   E task = tasks.poll();
                   list.add(task);
               }
               return list;
           }
   
           public void start() {
               consumer.start();
           }
   
           public void shutdown() throws InterruptedException {
               run = false;
               executor.shutdown();
           }
   
           public int getAndAddCurrentStage(int delta) {
               return currentStage.getAndAdd(delta);
           }
   
           public final void runPriorityAsync(Integer priority, Runnable... tasks) {
               if (null == tasks || tasks.length == 0) {
                   return;
               }
               Queue<Runnable> queue = priorityTasks.putIfAbsent(priority, new ConcurrentLinkedQueue<>());
               if (null == queue) {
                   queue = priorityTasks.get(priority);
               }
               for (Runnable runnable : tasks) {
                   queue.offer(runnable);
               }
           }
   
           public final void runPriorityAsync(Integer priority, Collection<Runnable> tasks) {
               if (null == tasks || tasks.isEmpty()) {
                   return;
               }
               Queue<Runnable> queue = priorityTasks.putIfAbsent(priority, new ConcurrentLinkedQueue<>());
               if (null == queue) {
                   queue = priorityTasks.get(priority);
               }
               for (Runnable runnable : tasks) {
                   queue.offer(runnable);
               }
           }
   
           public final void runAsync(Queue<Runnable> tasks) {
               runAsync(pollAllTask(tasks));
           }
   
           public final void runAsync(Collection<Runnable> tasks) {
               if (null == tasks || tasks.isEmpty()) {
                   return;
               }
               List<CompletableFuture<Void>> list = new ArrayList<>(tasks.size());
               for (Runnable task : tasks) {
                   // Other thread pools can also be used here
                   list.add(CompletableFuture.runAsync(task, executor));
               }
               executeAsync(list);
           }
   
           public final <T> List<T> executeAsync(Collection<CompletableFuture<T>> tasks) {
               if (null == tasks || tasks.isEmpty()) {
                   return new ArrayList<>();
               }
               try {
                   CompletableFuture.allOf(tasks.toArray(new CompletableFuture[0])).join();
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return getResultIgnoreException(tasks);
           }
   
           public final <T> List<T> getResultIgnoreException(Collection<CompletableFuture<T>> tasks) {
               List<T> result = new ArrayList<>(tasks.size());
               for (CompletableFuture<T> completableFuture : tasks) {
                   if (null == completableFuture) {
                       continue;
                   }
                   try {
                       T response = completableFuture.get();
                       if (null != response) {
                           result.add(response);
                       }
                   } catch (Exception e) {
                       e.printStackTrace();
                   }
               }
               return result;
           }
   
           public void invokeAllNow() {
               synchronized (priorityTasks) {
                   // This ensures that tasks with high priority are executed first,
                   // and does not guarantee the execution order of tasks with the same priority
                   for (Queue<Runnable> queue : priorityTasks.values()) {
                       Queue<Runnable> runnableQueue = new ConcurrentLinkedQueue<>();
                       while (!queue.isEmpty()) {
                           Runnable element = queue.poll();
                           if (element != null) {
                               runnableQueue.offer(element);
                           }
                       }
                       runAsync(runnableQueue);
                   }
               }
           }
       }
   
       public static void main(String[] args) throws Exception {
           PriorityConcurrentEngine engine = new PriorityConcurrentEngine();
           List<Integer> stageDefinitions = new ArrayList<>();
           for (int i = 1; i <= 10; i++) {
               int sum = 0;
               for (int j = 1; j <= i; j++) {
                   sum = sum + j;
               }
               stageDefinitions.add(sum);
           }
           engine.start();
           for (int i = 0; i < 100; i++) {
               int currentStage = engine.getAndAddCurrentStage(1);
               int index = -1;
               for (int j = 0; j < stageDefinitions.size(); j++) {
                   if (currentStage < stageDefinitions.get(j)) {
                       index = j;
                       break;
                   }
               }
               int finalIndex = index;
               if (index == -1) {
                   engine.runPriorityAsync(PriorityConcurrentEngine.MIN_PRIORITY, () -> {
                       try {
                           //simulate business call
                           Thread.sleep(100);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                       System.out.println(finalIndex);
                   });
               } else {
                   engine.runPriorityAsync(index, () -> {
                       try {
                           //simulate business call
                           Thread.sleep(100);
                       } catch (InterruptedException e) {
                           e.printStackTrace();
                       }
                       System.out.println(finalIndex);
                   });
               }
           }
           /** 32 is the size of the thread pool {@link Example.PriorityConcurrentEngine#executor}*/
           // 1/32=1,2/32=1,3/32=1,4/32=1,5/32=1,6/32=1,7/32=1,8/32=1,9/32=1,10/32=1
           // 45/32=2
           // MessageListenerPeriodicConcurrently takes about (1*10+2)*100=1200ms
           Thread.sleep(1300);
           engine.shutdown();
   
           for (int i = 0; i < 100; i++) {
               Thread.sleep(100);
               System.out.println(i);
           }
           // MessageListenerOrderly takes about 100*100=10000ms
       }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org