You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@geode.apache.org by Kirk Lund <kl...@pivotal.io> on 2018/08/01 17:43:04 UTC

AsyncEventListener concurrency bugs

Several AsyncEventListener implementations in our test code are not
thread-safe and this is causing tests that use them to be flaky. I
discovered this while trying to fix the flakiness in some disabled tests in
AsyncEventListenerDUnitTest.

The issue here is that AsyncEventListener.processEvents can be invoked by
Geode with multiple threads concurrently. So, any implementation should
either not use state in the AsyncEventListener implementation or it should
do so in a thread-safe way.

The following are AsyncEventListener implementations in the test source:

1) org.apache.geode.internal.cache.wan.MyAsyncEventListener2

(this examples fixes existing flakiness in several tests)

 25 public class MyAsyncEventListener2 implements AsyncEventListener {
 26
 27   private Map<Integer, List<GatewaySenderEventImpl>> bucketToEventsMap;
 28
 29   public MyAsyncEventListener2() {
 30     this.bucketToEventsMap = new HashMap<Integer,
List<GatewaySenderEventImpl>>();
 31   }
 32
 33   public boolean processEvents(List<AsyncEvent> events) {
 34     for (AsyncEvent event : events) {
 35       GatewaySenderEventImpl gatewayEvent = (GatewaySenderEventImpl)
event;
 36       int bucketId = gatewayEvent.getBucketId();
 37       List<GatewaySenderEventImpl> bucketEvents =
this.bucketToEventsMap.get(bucketId);
 38       if (bucketEvents == null) {
 39         bucketEvents = new ArrayList<GatewaySenderEventImpl>();
 40         bucketEvents.add(gatewayEvent);
 41         this.bucketToEventsMap.put(bucketId, bucketEvents);
 42       } else {
 43         bucketEvents.add(gatewayEvent);
 44       }
 45     }
 46     return true;
 47   }

Multiple threads can invoke processEvents concurrently. Try enabling
AsyncEventListenerDUnitTest.testParallelAsyncEventQueueHA_Scenario1 by
removing the @Ignore annotation and then run until failure in IntelliJ.
You'll see it fail.

Now edit
AsyncEventQueueTestBase.verifyAsyncEventListenerForPossibleDuplicates:

@@ -20,6 +20,7 @@ import static
org.apache.geode.distributed.ConfigurationProperties.MCAST_PORT;
 import static
org.apache.geode.distributed.ConfigurationProperties.OFF_HEAP_MEMORY_SIZE;
 import static
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
 import static
org.apache.geode.distributed.ConfigurationProperties.START_LOCATOR;
*+import static org.assertj.core.api.Assertions.assertThat;*
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
@@ -1343,6 +1344,7 @@ public class AsyncEventQueueTestBase extends
JUnit4DistributedTestCase {
       LogWriterUtils.getLogWriter()
           .info("Events for bucket: " + bucketId + " is " +
eventsForBucket);
       assertNotNull(eventsForBucket);
*+      assertThat(eventsForBucket.size()).as("bucketToEventsMap: " +
bucketToEventsMap).isEqualTo(batchSize);*
       for (int i = 0; i < batchSize; i++) {
         GatewaySenderEventImpl senderEvent = eventsForBucket.get(i);
         assertTrue(senderEvent.getPossibleDuplicate());

Now if you run until failure, you'll see that the contents of
bucketToEventsMap is a mess, including duplicate keys.

The reason is that two threads executing MyAsyncEventListener2.processEvents
concurrently both see bucketEvents == null on line 38 so they both perform
their own put on line 41.

Changing MyAsyncEventListener2's bucketToEventsMap from HashMap to
ConcurrentHashMap isn't the best solution because multiple lines are
accessing and mutating bucketToEventsMap in the method (ie non-atomically).
In this case, the better fix is to mark the processEvents as synchronized.

2) org.apache.geode.internal.cache.wan.asyncqueue.AbstractMovingAsyncEventListener

(this example helps prevent future flakiness with defensive coding by
proper encapsulation)

This one doesn't cause any flakiness *yet* but because the member fields
are non-private, it's very easy for a developer to write a new flaky test
simply by modifying the values of these member fields from a sub-class or
from any test within the same package.


*28 public abstract class AbstractMovingAsyncEventListener implements
AsyncEventListener {*
* 29   protected final DistributedMember destination;*
* 30   boolean moved;*
* 31   Set<Object> keysSeen = new HashSet<Object>();*
* 32 *
* 33   public AbstractMovingAsyncEventListener(final DistributedMember
destination) {*
* 34     this.destination = destination;*
* 35   }*
* 36 *
* 37   @Override*
* 38   public boolean processEvents(final List<AsyncEvent> events) {*
* 39     if (!moved) {*
* 40 *
* 41       AsyncEvent event1 = events.get(0);*
* 42       move(event1);*
* 43       moved = true;*
* 44       return false;*
* 45     }*
* 46 *
* 47     events.stream().map(AsyncEvent::getKey).forEach(keysSeen::add);*
* 48     return true;*
* 49   }*

Imagine thread-1 is reading moved on line 39 while thread-2 is modifying
moved on line 43. Now imagine that thread-3 is modifying moved directly
from a test or sub-class:

*listener.moved = false;*

This creates a mess that more defensive coding could prevent (ie by making
the member fields private and thus more encapsulated). Alternatively, if we
need to have a sub-class reset moved, then we would probably need to make
processEvents synchronized. It would be safest to still make moved private
and then keep all access and mutation of moved local within
AbstractMovingAsyncEventListener so that the concurrency (or lack of it) is
fully visible when looking at that class. Having to look for how
sub-classes manipulate a non-private field slows the developer down and
risks overlooking something that's unexpected.

The two subclasses of AbstractMovingAsyncEventListener in
AsyncEventListenerDUnitTest do NOT currently access or mutate any of the
member fields directly, so there's no reason they shouldn't just be private.