You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ds...@apache.org on 2016/10/28 21:41:43 UTC
[89/98] [abbrv] incubator-geode git commit: GEODE-2027:
ParallelQueueRemovalMessage processing removes events from the region and
temp queue
GEODE-2027: ParallelQueueRemovalMessage processing removes events from the region and temp queue
Project: http://git-wip-us.apache.org/repos/asf/incubator-geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-geode/commit/b10a171e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-geode/tree/b10a171e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-geode/diff/b10a171e
Branch: refs/heads/feature/GEM-983
Commit: b10a171e11990a566ed42560a903668908131390
Parents: 87f2fb5
Author: Barry Oglesby <bo...@pivotal.io>
Authored: Fri Oct 21 12:40:41 2016 -0700
Committer: Barry Oglesby <bo...@pivotal.io>
Committed: Thu Oct 27 09:17:35 2016 -0700
----------------------------------------------------------------------
.../cache/wan/AbstractGatewaySender.java | 2 +-
.../parallel/ParallelQueueRemovalMessage.java | 22 +-
.../internal/cache/BucketRegionQueueHelper.java | 58 +++++
.../ParallelQueueRemovalMessageJUnitTest.java | 256 +++++++++++++++++++
.../java/org/apache/geode/test/fake/Fakes.java | 5 +-
5 files changed, 326 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 33119bc..e1c9010 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -684,7 +684,7 @@ public abstract class AbstractGatewaySender implements GatewaySender, Distributi
return null;
}
- final public Set<RegionQueue> getQueues() {
+ public Set<RegionQueue> getQueues() {
if (this.eventProcessor != null) {
if (!(this.eventProcessor instanceof ConcurrentSerialGatewaySenderEventProcessor)) {
Set<RegionQueue> queues = new HashSet<RegionQueue>();
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
index a363b5d..bad3d3c 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessage.java
@@ -135,22 +135,14 @@ public class ParallelQueueRemovalMessage extends PooledDistributionMessage {
afterAckForSecondary_EventInBucket(abstractSender, brq, key);
destroyKeyFromBucketQueue(brq, key, region);
isDestroyed = true;
- } else {
- // if BucketRegionQueue does not have the key, it
- // should be in tempQueue
- // remove it from there..defect #49196
- isDestroyed =
- destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key);
- }
- if (!isDestroyed) {
- // event is neither destroyed from BucketRegionQueue nor from tempQueue
- brq.addToFailedBatchRemovalMessageKeys(key);
- if (isDebugEnabled) {
- logger.debug(
- "Event is neither destroyed from BucketRegionQueue not from tempQueue. Added to failedBatchRemovalMessageKeys: {}",
- key);
- }
}
+
+ // Even if BucketRegionQueue does not have the key, it could be in the tempQueue
+ // remove it from there..defect #49196
+ destroyFromTempQueue(brq.getPartitionedRegion(), (Integer) bId, key);
+
+ // Finally, add the key to the failed batch removal keys so that it is definitely removed from the bucket region queue
+ brq.addToFailedBatchRemovalMessageKeys(key);
} finally {
brq.getInitializationLock().readLock().unlock();
}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
new file mode 100644
index 0000000..68b29c2
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/BucketRegionQueueHelper.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ *
+ */
+package org.apache.geode.internal.cache;
+
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Helper class in the internal cache package to access protected BucketRegionQueue methods.
+ */
+public class BucketRegionQueueHelper {
+
+ private BucketRegionQueue bucketRegionQueue;
+
+ public BucketRegionQueueHelper(GemFireCacheImpl cache, PartitionedRegion queueRegion, BucketRegionQueue bucketRegionQueue) {
+ this.bucketRegionQueue = bucketRegionQueue;
+ initialize(cache, queueRegion);
+ }
+
+ public GatewaySenderEventImpl addEvent(Object key) {
+ this.bucketRegionQueue.getEventTracker().setInitialized();
+ this.bucketRegionQueue.entries.disableLruUpdateCallback();
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ this.bucketRegionQueue.entries.initialImagePut(key, 0, event, false, false, null, null, false);
+ this.bucketRegionQueue.entries.enableLruUpdateCallback();
+ return event;
+ }
+
+ public void cleanUpDestroyedTokensAndMarkGIIComplete() {
+ this.bucketRegionQueue.cleanUpDestroyedTokensAndMarkGIIComplete(InitialImageOperation.GIIStatus.NO_GII);
+ }
+
+ public void initialize(GemFireCacheImpl cache, PartitionedRegion queueRegion) {
+ InternalDistributedMember member = cache.getMyId();
+ when(queueRegion.getMyId()).thenReturn(member);
+ when(cache.getRegionByPath(this.bucketRegionQueue.getFullPath())).thenReturn(this.bucketRegionQueue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
new file mode 100644
index 0000000..cc9caaf
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelQueueRemovalMessageJUnitTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.geode.internal.cache.wan.parallel;
+
+import org.apache.geode.CancelCriterion;
+import org.apache.geode.cache.*;
+import org.apache.geode.internal.cache.*;
+import org.apache.geode.internal.cache.lru.LRUAlgorithm;
+import org.apache.geode.internal.cache.partitioned.RegionAdvisor;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.test.fake.Fakes;
+import org.apache.geode.test.junit.categories.UnitTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.*;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@Category(UnitTest.class)
+public class ParallelQueueRemovalMessageJUnitTest {
+
+ private GemFireCacheImpl cache;
+ private PartitionedRegion queueRegion;
+ private AbstractGatewaySender sender;
+ private PartitionedRegion rootRegion;
+ private BucketRegionQueue bucketRegionQueue;
+ private BucketRegionQueueHelper bucketRegionQueueHelper;
+
+ private static String GATEWAY_SENDER_ID = "ny";
+ private static int BUCKET_ID = 85;
+ private static long KEY = 198l;
+
+ @Before
+ public void setUpGemFire() {
+ createCache();
+ createQueueRegion();
+ createGatewaySender();
+ createRootRegion();
+ createBucketRegionQueue();
+ }
+
+ private void createCache() {
+ // Mock cache
+ this.cache = Fakes.cache();
+ GemFireCacheImpl.setInstanceForTests(this.cache);
+ }
+
+ private void createQueueRegion() {
+ // Mock queue region
+ this.queueRegion = mock(PartitionedRegion.class);
+ when(this.queueRegion.getFullPath()).thenReturn(getRegionQueueName());
+ when(this.queueRegion.getPrStats()).thenReturn(mock(PartitionedRegionStats.class));
+ when(this.queueRegion.getDataStore()).thenReturn(mock(PartitionedRegionDataStore.class));
+ when(this.queueRegion.getCache()).thenReturn(this.cache);
+ EvictionAttributesImpl ea = (EvictionAttributesImpl) EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK);
+ LRUAlgorithm algorithm = ea.createEvictionController(this.queueRegion, false);
+ algorithm.getLRUHelper().initStats(this.queueRegion, this.cache.getDistributedSystem());
+ when(this.queueRegion.getEvictionController()).thenReturn(algorithm);
+ }
+
+ private void createGatewaySender() {
+ // Mock gateway sender
+ this.sender = mock(AbstractGatewaySender.class);
+ when(this.queueRegion.getParallelGatewaySender()).thenReturn(this.sender);
+ when(this.sender.getQueues()).thenReturn(null);
+ when(this.sender.getDispatcherThreads()).thenReturn(1);
+ when(this.sender.getCache()).thenReturn(this.cache);
+ CancelCriterion cancelCriterion = mock(CancelCriterion.class);
+ when(sender.getCancelCriterion()).thenReturn(cancelCriterion);
+ }
+
+ private void createRootRegion() {
+ // Mock root region
+ this.rootRegion = mock(PartitionedRegion.class);
+ when(this.rootRegion.getFullPath()).thenReturn(Region.SEPARATOR+PartitionedRegionHelper.PR_ROOT_REGION_NAME);
+ when(this.cache.getRegion(PartitionedRegionHelper.PR_ROOT_REGION_NAME, true)).thenReturn(this.rootRegion);
+ when(this.cache.getRegion(getRegionQueueName(), false)).thenReturn(this.queueRegion);
+ }
+
+ private void createBucketRegionQueue() {
+ // Create InternalRegionArguments
+ InternalRegionArguments ira = new InternalRegionArguments();
+ ira.setPartitionedRegion(this.queueRegion);
+ ira.setPartitionedRegionBucketRedundancy(1);
+ BucketAdvisor ba = mock(BucketAdvisor.class);
+ ira.setBucketAdvisor(ba);
+ InternalRegionArguments pbrIra = new InternalRegionArguments();
+ RegionAdvisor ra = mock(RegionAdvisor.class);
+ when(ra.getPartitionedRegion()).thenReturn(this.queueRegion);
+ pbrIra.setPartitionedRegionAdvisor(ra);
+ PartitionAttributes pa = mock(PartitionAttributes.class);
+ when(this.queueRegion.getPartitionAttributes()).thenReturn(pa);
+ when(this.queueRegion.getDataPolicy()).thenReturn(DataPolicy.PARTITION);
+ when(pa.getColocatedWith()).thenReturn(null);
+ ProxyBucketRegion pbr = new ProxyBucketRegion(BUCKET_ID, this.queueRegion, pbrIra); // final classes cannot be mocked
+ when(ba.getProxyBucketRegion()).thenReturn(pbr);
+
+ // Create RegionAttributes
+ AttributesFactory factory = new AttributesFactory();
+ factory.setScope(Scope.DISTRIBUTED_ACK);
+ factory.setDataPolicy(DataPolicy.REPLICATE);
+ factory.setEvictionAttributes(EvictionAttributes.createLRUMemoryAttributes(100, null, EvictionAction.OVERFLOW_TO_DISK));
+ RegionAttributes attributes = factory.create();
+
+ // Create BucketRegionQueue
+ this.bucketRegionQueue = new BucketRegionQueue(this.queueRegion.getBucketName(BUCKET_ID), attributes, this.rootRegion, this.cache, ira);
+ this.bucketRegionQueueHelper = new BucketRegionQueueHelper(this.cache, this.queueRegion, this.bucketRegionQueue);
+ }
+
+ @After
+ public void tearDownGemFire() {
+ GemFireCacheImpl.setInstanceForTests(null);
+ }
+
+ @Test
+ public void validateFailedBatchRemovalMessageKeysInUninitializedBucketRegionQueue() throws Exception {
+ // Validate initial BucketRegionQueue state
+ assertFalse(this.bucketRegionQueue.isInitialized());
+ assertEquals(0, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+
+ // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key)
+ createAndProcessParallelQueueRemovalMessage();
+
+ // Validate BucketRegionQueue after processing ParallelQueueRemovalMessage
+ assertEquals(1, this.bucketRegionQueue.getFailedBatchRemovalMessageKeys().size());
+ }
+
+ @Test
+ public void validateDestroyKeyFromBucketQueueInUninitializedBucketRegionQueue() throws Exception {
+ // Validate initial BucketRegionQueue state
+ assertEquals(0, this.bucketRegionQueue.size());
+ assertFalse(this.bucketRegionQueue.isInitialized());
+
+ // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
+ this.bucketRegionQueueHelper.addEvent(KEY);
+ assertEquals(1, this.bucketRegionQueue.size());
+
+ // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED)
+ when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null));
+ createAndProcessParallelQueueRemovalMessage();
+
+ // Clean up destroyed tokens and validate BucketRegionQueue
+ this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+ assertEquals(0, this.bucketRegionQueue.size());
+ }
+
+ @Test
+ public void validateDestroyFromTempQueueInUninitializedBucketRegionQueue() throws Exception {
+ // Validate initial BucketRegionQueue state
+ assertFalse(this.bucketRegionQueue.isInitialized());
+
+ // Create a real ConcurrentParallelGatewaySenderQueue
+ ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue();
+
+ // Add a mock GatewaySenderEventImpl to the temp queue
+ BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, mock(GatewaySenderEventImpl.class));
+ assertEquals(1, tempQueue.size());
+
+ // Create and process a ParallelQueueRemovalMessage (causes the failedBatchRemovalMessageKeys to add a key)
+ createAndProcessParallelQueueRemovalMessage();
+
+ // Validate temp queue is empty after processing ParallelQueueRemovalMessage
+ assertEquals(0, tempQueue.size());
+ }
+
+ @Test
+ public void validateDestroyFromBucketQueueAndTempQueueInUninitializedBucketRegionQueue() {
+ // Validate initial BucketRegionQueue state
+ assertFalse(this.bucketRegionQueue.isInitialized());
+ assertEquals(0, this.bucketRegionQueue.size());
+
+ // Create a real ConcurrentParallelGatewaySenderQueue
+ ParallelGatewaySenderEventProcessor pgsep = createConcurrentParallelGatewaySenderQueue();
+
+ // Add an event to the BucketRegionQueue and verify BucketRegionQueue state
+ GatewaySenderEventImpl gsei = this.bucketRegionQueueHelper.addEvent(KEY);
+ assertEquals(1, this.bucketRegionQueue.size());
+
+ // Add a mock GatewaySenderEventImpl to the temp queue
+ BlockingQueue<GatewaySenderEventImpl> tempQueue = createTempQueueAndAddEvent(pgsep, gsei);
+ assertEquals(1, tempQueue.size());
+
+ // Create and process a ParallelQueueRemovalMessage (causes the value of the entry to be set to DESTROYED)
+ when(this.queueRegion.getKeyInfo(KEY, null, null)).thenReturn(new KeyInfo(KEY, null, null));
+ createAndProcessParallelQueueRemovalMessage();
+
+ // Validate temp queue is empty after processing ParallelQueueRemovalMessage
+ assertEquals(0, tempQueue.size());
+
+ // Clean up destroyed tokens
+ this.bucketRegionQueueHelper.cleanUpDestroyedTokensAndMarkGIIComplete();
+
+ // Validate BucketRegionQueue is empty after processing ParallelQueueRemovalMessage
+ assertEquals(0, this.bucketRegionQueue.size());
+ }
+
+ private void createAndProcessParallelQueueRemovalMessage() {
+ ParallelQueueRemovalMessage pqrm = new ParallelQueueRemovalMessage(createRegionToDispatchedKeysMap());
+ pqrm.process(null);
+ }
+
+ private HashMap<String, Map<Integer, List<Long>>> createRegionToDispatchedKeysMap() {
+ HashMap<String, Map<Integer, List<Long>>> regionToDispatchedKeys = new HashMap<>();
+ Map<Integer, List<Long>> bucketIdToDispatchedKeys = new HashMap<>();
+ List<Long> dispatchedKeys = new ArrayList<>();
+ dispatchedKeys.add(KEY);
+ bucketIdToDispatchedKeys.put(BUCKET_ID, dispatchedKeys);
+ regionToDispatchedKeys.put(getRegionQueueName(), bucketIdToDispatchedKeys);
+ return regionToDispatchedKeys;
+ }
+
+ private ParallelGatewaySenderEventProcessor createConcurrentParallelGatewaySenderQueue() {
+ ParallelGatewaySenderEventProcessor pgsep = new ParallelGatewaySenderEventProcessor(sender);
+ ConcurrentParallelGatewaySenderQueue cpgsq = new ConcurrentParallelGatewaySenderQueue(sender, new ParallelGatewaySenderEventProcessor[] {pgsep});
+ Set<RegionQueue> queues = new HashSet<>();
+ queues.add(cpgsq);
+ when(this.sender.getQueues()).thenReturn(queues);
+ return pgsep;
+ }
+
+ private BlockingQueue<GatewaySenderEventImpl> createTempQueueAndAddEvent(ParallelGatewaySenderEventProcessor pgsep, GatewaySenderEventImpl gsei) {
+ ParallelGatewaySenderQueue pgsq = (ParallelGatewaySenderQueue) pgsep.getQueue();
+ Map<Integer, BlockingQueue<GatewaySenderEventImpl>> tempQueueMap = pgsq.getBucketToTempQueueMap();
+ BlockingQueue<GatewaySenderEventImpl> tempQueue = new LinkedBlockingQueue();
+ when(gsei.getShadowKey()).thenReturn(KEY);
+ tempQueue.add(gsei);
+ tempQueueMap.put(BUCKET_ID, tempQueue);
+ return tempQueue;
+ }
+
+ private String getRegionQueueName() {
+ return Region.SEPARATOR+GATEWAY_SENDER_ID+ ParallelGatewaySenderQueue.QSTRING;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/b10a171e/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
----------------------------------------------------------------------
diff --git a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
index 93195c1..2ab64dd 100644
--- a/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
+++ b/geode-core/src/test/java/org/apache/geode/test/fake/Fakes.java
@@ -26,7 +26,7 @@ import org.apache.geode.distributed.internal.DistributionConfig;
import org.apache.geode.distributed.internal.DistributionManager;
import org.apache.geode.distributed.internal.InternalDistributedSystem;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
-import org.apache.geode.internal.cache.AbstractRegion;
+import org.apache.geode.internal.cache.CachePerfStats;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import java.io.File;
@@ -74,9 +74,11 @@ public class Fakes {
when(config.getDeployWorkingDir()).thenReturn(new File("."));
when(cache.getDistributedSystem()).thenReturn(system);
+ when(cache.getSystem()).thenReturn(system);
when(cache.getMyId()).thenReturn(member);
when(cache.getDistributionManager()).thenReturn(distributionManager);
when(cache.getCancelCriterion()).thenReturn(systemCancelCriterion);
+ when(cache.getCachePerfStats()).thenReturn(mock(CachePerfStats.class));
when(system.getDistributedMember()).thenReturn(member);
when(system.getConfig()).thenReturn(config);
@@ -88,6 +90,7 @@ public class Fakes {
when(system.createAtomicStatistics(any(), any())).thenReturn(stats);
when(distributionManager.getId()).thenReturn(member);
+ when(distributionManager.getDistributionManagerId()).thenReturn(member);
when(distributionManager.getConfig()).thenReturn(config);
when(distributionManager.getSystem()).thenReturn(system);
when(distributionManager.getCancelCriterion()).thenReturn(systemCancelCriterion);