You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/05/28 09:01:50 UTC

[rocketmq] 02/02: Revert "Fix Concurrent issue of StoreStatsService"

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch revert-261-fix-store-stat-concurrent
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit ae48e2a1582f67a8ffa9c1484306f2474aef9da8
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Mon May 28 17:01:46 2018 +0800

    Revert "Fix Concurrent issue of StoreStatsService"
    
    This reverts commit fdc6e87cbe7adb8bc8f5991e06392eee73bc5151.
---
 .../apache/rocketmq/store/StoreStatsService.java   | 15 ++---
 .../rocketmq/store/StoreStatsServiceTest.java      | 77 ----------------------
 2 files changed, 4 insertions(+), 88 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index 8862fd7..586947c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantLock;
 import org.apache.rocketmq.common.ServiceThread;
@@ -43,9 +42,9 @@ public class StoreStatsService extends ServiceThread {
 
     private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
 
-    private final ConcurrentMap<String, AtomicLong> putMessageTopicTimesTotal =
+    private final Map<String, AtomicLong> putMessageTopicTimesTotal =
         new ConcurrentHashMap<String, AtomicLong>(128);
-    private final ConcurrentMap<String, AtomicLong> putMessageTopicSizeTotal =
+    private final Map<String, AtomicLong> putMessageTopicSizeTotal =
         new ConcurrentHashMap<String, AtomicLong>(128);
 
     private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
@@ -546,10 +545,7 @@ public class StoreStatsService extends ServiceThread {
         AtomicLong rs = putMessageTopicSizeTotal.get(topic);
         if (null == rs) {
             rs = new AtomicLong(0);
-            AtomicLong previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
-            if(previous != null){
-                rs = previous;
-            }
+            putMessageTopicSizeTotal.put(topic, rs);
         }
         return rs;
     }
@@ -558,10 +554,7 @@ public class StoreStatsService extends ServiceThread {
         AtomicLong rs = putMessageTopicTimesTotal.get(topic);
         if (null == rs) {
             rs = new AtomicLong(0);
-            AtomicLong previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
-            if(previous != null){
-                rs = previous;
-            }
+            putMessageTopicTimesTotal.put(topic, rs);
         }
         return rs;
     }
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java
deleted file mode 100644
index bb39bf9..0000000
--- a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.rocketmq.store;
-
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.junit.Test;
-
-/**
- * @author song
- */
-public class StoreStatsServiceTest {
-
-  @Test
-  public void getSinglePutMessageTopicSizeTotal() throws Exception {
-    final StoreStatsService storeStatsService = new StoreStatsService();
-    int num = Runtime.getRuntime().availableProcessors() * 2;
-    for (int j = 0; j < 100; j++) {
-      final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
-      final CountDownLatch latch = new CountDownLatch(num);
-      final CyclicBarrier barrier = new CyclicBarrier(num);
-      for (int i = 0; i < num; i++) {
-        new Thread(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              barrier.await();
-              AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicSizeTotal("test");
-              if(reference.compareAndSet(null,atomicLong)){
-              }else if (reference.get() != atomicLong){
-                throw new RuntimeException("Reference should be same!");
-              }
-            } catch (InterruptedException | BrokenBarrierException e) {
-              e.printStackTrace();
-            }finally {
-              latch.countDown();
-            }
-          }
-        }).start();
-      }
-      latch.await();
-    }
-  }
-
-  @Test
-  public void getSinglePutMessageTopicTimesTotal() throws Exception {
-    final StoreStatsService storeStatsService = new StoreStatsService();
-    int num = Runtime.getRuntime().availableProcessors() * 2;
-    for (int j = 0; j < 100; j++) {
-      final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
-      final CountDownLatch latch = new CountDownLatch(num);
-      final CyclicBarrier barrier = new CyclicBarrier(num);
-      for (int i = 0; i < num; i++) {
-        new Thread(new Runnable() {
-          @Override
-          public void run() {
-            try {
-              barrier.await();
-              AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicTimesTotal("test");
-              if(reference.compareAndSet(null,atomicLong)){
-              }else if (reference.get() != atomicLong){
-                throw new RuntimeException("Reference should be same!");
-              }
-            } catch (InterruptedException | BrokenBarrierException e) {
-              e.printStackTrace();
-            }finally {
-              latch.countDown();
-            }
-          }
-        }).start();
-      }
-      latch.await();
-    }
-  }
-
-}
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.