You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicecomb.apache.org by ni...@apache.org on 2019/08/07 00:55:25 UTC

[servicecomb-pack] 02/02: SCB-1423 Use the AtomInteger counter instead of ConcurrentLinkedQueue.size()

This is an automated email from the ASF dual-hosted git repository.

ningjiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-pack.git

commit 70adb95ebc469987808f7e4e9af1fb85199f8c18
Author: Lei Zhang <co...@gmail.com>
AuthorDate: Tue Aug 6 20:12:58 2019 +0800

    SCB-1423 Use the AtomInteger counter instead of ConcurrentLinkedQueue.size()
---
 .../alpha/server/AlphaIntegrationWithRandomPortTest.java     | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)

diff --git a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
index 891f1a5..899a4fb 100644
--- a/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
+++ b/alpha/alpha-server/src/test/java/org/apache/servicecomb/pack/alpha/server/AlphaIntegrationWithRandomPortTest.java
@@ -21,6 +21,7 @@ import com.google.protobuf.ByteString;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
 import io.grpc.stub.StreamObserver;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.servicecomb.pack.alpha.core.*;
 import org.apache.servicecomb.pack.common.EventType;
 import org.apache.servicecomb.pack.contract.grpc.*;
@@ -116,6 +117,7 @@ public class AlphaIntegrationWithRandomPortTest {
   @Autowired
   private TxConsistentService consistentService;
 
+  private static final AtomicInteger receivedCommandsCounter = new AtomicInteger();
   private static final Queue<GrpcCompensateCommand> receivedCommands = new ConcurrentLinkedQueue<>();
 
   private final CompensationStreamObserver compensateResponseObserver = new CompensationStreamObserver(
@@ -138,6 +140,7 @@ public class AlphaIntegrationWithRandomPortTest {
     System.out.println(" globalTxId " + globalTxId);
     eventRepo.deleteAll();
     receivedCommands.clear();
+    receivedCommandsCounter.set(0);
   }
 
   @After
@@ -264,7 +267,7 @@ public class AlphaIntegrationWithRandomPortTest {
     blockingStub.onTxEvent(eventOf(TxEndedEvent, localTxId1, parentTxId1, new byte[0], "method b"));
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
-    await().atMost(1, SECONDS).until(() -> receivedCommands.size() > 1);
+    await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() > 1);
 
     assertThat(receivedCommands, contains(
         GrpcCompensateCommand.newBuilder().setGlobalTxId(globalTxId).setLocalTxId(localTxId1).setParentTxId(parentTxId1)
@@ -310,7 +313,7 @@ public class AlphaIntegrationWithRandomPortTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
-    assertThat(receivedCommands.size(), is(1));
+    assertThat(receivedCommandsCounter.get(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
 
     anotherBlockingStub.onDisconnected(anotherServiceConfig);
@@ -325,7 +328,7 @@ public class AlphaIntegrationWithRandomPortTest {
 
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent));
 
-    await().atMost(1, SECONDS).until(() -> receivedCommands.size() == 1);
+    await().atMost(1, SECONDS).until(() -> receivedCommandsCounter.get() == 1);
 
     String localTxId1 = UUID.randomUUID().toString();
     String parentTxId1 = UUID.randomUUID().toString();
@@ -354,7 +357,7 @@ public class AlphaIntegrationWithRandomPortTest {
     blockingStub.onTxEvent(someGrpcEvent(TxAbortedEvent, globalTxId, anotherLocalTxId2));
     await().atMost(1, SECONDS).until(() -> !receivedCommands.isEmpty());
 
-    assertThat(receivedCommands.size(), is(1));
+    assertThat(receivedCommandsCounter.get(), is(1));
     assertThat(receivedCommands.poll().getGlobalTxId(), is(globalTxId));
   }
 
@@ -565,6 +568,7 @@ public class AlphaIntegrationWithRandomPortTest {
       // intercept received command
       consumer.accept(command);
       receivedCommands.add(command);
+      receivedCommandsCounter.incrementAndGet();
     }
 
     @Override