You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2018/05/28 09:01:50 UTC
[rocketmq] 02/02: Revert "Fix Concurrent issue of StoreStatsService"
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch revert-261-fix-store-stat-concurrent
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit ae48e2a1582f67a8ffa9c1484306f2474aef9da8
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Mon May 28 17:01:46 2018 +0800
Revert "Fix Concurrent issue of StoreStatsService"
This reverts commit fdc6e87cbe7adb8bc8f5991e06392eee73bc5151.
---
.../apache/rocketmq/store/StoreStatsService.java | 15 ++---
.../rocketmq/store/StoreStatsServiceTest.java | 77 ----------------------
2 files changed, 4 insertions(+), 88 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
index 8862fd7..586947c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreStatsService.java
@@ -21,7 +21,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.rocketmq.common.ServiceThread;
@@ -43,9 +42,9 @@ public class StoreStatsService extends ServiceThread {
private final AtomicLong putMessageFailedTimes = new AtomicLong(0);
- private final ConcurrentMap<String, AtomicLong> putMessageTopicTimesTotal =
+ private final Map<String, AtomicLong> putMessageTopicTimesTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
- private final ConcurrentMap<String, AtomicLong> putMessageTopicSizeTotal =
+ private final Map<String, AtomicLong> putMessageTopicSizeTotal =
new ConcurrentHashMap<String, AtomicLong>(128);
private final AtomicLong getMessageTimesTotalFound = new AtomicLong(0);
@@ -546,10 +545,7 @@ public class StoreStatsService extends ServiceThread {
AtomicLong rs = putMessageTopicSizeTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
- AtomicLong previous = putMessageTopicSizeTotal.putIfAbsent(topic, rs);
- if(previous != null){
- rs = previous;
- }
+ putMessageTopicSizeTotal.put(topic, rs);
}
return rs;
}
@@ -558,10 +554,7 @@ public class StoreStatsService extends ServiceThread {
AtomicLong rs = putMessageTopicTimesTotal.get(topic);
if (null == rs) {
rs = new AtomicLong(0);
- AtomicLong previous = putMessageTopicTimesTotal.putIfAbsent(topic, rs);
- if(previous != null){
- rs = previous;
- }
+ putMessageTopicTimesTotal.put(topic, rs);
}
return rs;
}
diff --git a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java b/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java
deleted file mode 100644
index bb39bf9..0000000
--- a/store/src/test/java/org/apache/rocketmq/store/StoreStatsServiceTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-package org.apache.rocketmq.store;
-
-import java.util.concurrent.BrokenBarrierException;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.junit.Test;
-
-/**
- * @author song
- */
-public class StoreStatsServiceTest {
-
- @Test
- public void getSinglePutMessageTopicSizeTotal() throws Exception {
- final StoreStatsService storeStatsService = new StoreStatsService();
- int num = Runtime.getRuntime().availableProcessors() * 2;
- for (int j = 0; j < 100; j++) {
- final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
- final CountDownLatch latch = new CountDownLatch(num);
- final CyclicBarrier barrier = new CyclicBarrier(num);
- for (int i = 0; i < num; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- barrier.await();
- AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicSizeTotal("test");
- if(reference.compareAndSet(null,atomicLong)){
- }else if (reference.get() != atomicLong){
- throw new RuntimeException("Reference should be same!");
- }
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }finally {
- latch.countDown();
- }
- }
- }).start();
- }
- latch.await();
- }
- }
-
- @Test
- public void getSinglePutMessageTopicTimesTotal() throws Exception {
- final StoreStatsService storeStatsService = new StoreStatsService();
- int num = Runtime.getRuntime().availableProcessors() * 2;
- for (int j = 0; j < 100; j++) {
- final AtomicReference<AtomicLong> reference = new AtomicReference<>(null);
- final CountDownLatch latch = new CountDownLatch(num);
- final CyclicBarrier barrier = new CyclicBarrier(num);
- for (int i = 0; i < num; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- try {
- barrier.await();
- AtomicLong atomicLong = storeStatsService.getSinglePutMessageTopicTimesTotal("test");
- if(reference.compareAndSet(null,atomicLong)){
- }else if (reference.get() != atomicLong){
- throw new RuntimeException("Reference should be same!");
- }
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }finally {
- latch.countDown();
- }
- }
- }).start();
- }
- latch.await();
- }
- }
-
-}
\ No newline at end of file
--
To stop receiving notification emails like this one, please contact
lizhanhui@apache.org.