You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by mh...@apache.org on 2019/12/18 21:46:28 UTC
[geode] 01/03: GEODE-7537: use dm.getCache instead of
CacheFactory.getAnyInstance (#4486)
This is an automated email from the ASF dual-hosted git repository.
mhanson pushed a commit to branch release/1.11.0
in repository https://gitbox.apache.org/repos/asf/geode.git
commit e87b763afe5bcd12a61c978314410d0e4eadd6d4
Author: Eric Shu <es...@pivotal.io>
AuthorDate: Tue Dec 17 12:57:19 2019 -0800
GEODE-7537: use dm.getCache instead of CacheFactory.getAnyInstance (#4486)
(cherry picked from commit 53f851c94b6ee2e62e8e16f3cd73e77cec54eb19)
---
...aySenderQueueEntrySynchronizationOperation.java | 21 +++----
.../wan/parallel/ParallelGatewaySenderQueue.java | 12 ++--
...nderQueueEntrySynchronizationOperationTest.java | 71 ++++++++++++++++++++++
.../ParallelGatewaySenderQueueJUnitTest.java | 28 +++++++++
4 files changed, 114 insertions(+), 18 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
index 08d1e5b..c13b3c5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperation.java
@@ -27,7 +27,6 @@ import org.apache.logging.log4j.Logger;
import org.apache.geode.DataSerializer;
import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
import org.apache.geode.cache.wan.GatewayQueueEvent;
import org.apache.geode.cache.wan.GatewaySender;
import org.apache.geode.distributed.internal.ClusterDistributionManager;
@@ -41,6 +40,7 @@ import org.apache.geode.distributed.internal.ReplyProcessor21;
import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.InternalRegion;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.versions.VersionTag;
@@ -95,7 +95,7 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
}
protected GemFireCacheImpl getCache() {
- return (GemFireCacheImpl) CacheFactory.getAnyInstance();
+ return (GemFireCacheImpl) region.getDistributionManager().getCache();
}
private void initializeEntriesToSynchronize(
@@ -163,8 +163,8 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
}
}
- private Cache getCache() {
- return CacheFactory.getAnyInstance();
+ Cache getCache() {
+ return dmgr.getCache();
}
}
@@ -198,7 +198,7 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
logger.debug("{}: Providing synchronization region={}; entriesToSynchronize={}",
getClass().getSimpleName(), this.regionPath, this.entriesToSynchronize);
}
- result = getSynchronizationEvents();
+ result = getSynchronizationEvents(dm.getCache());
} catch (Throwable t) {
replyException = new ReplyException(t);
} finally {
@@ -218,15 +218,14 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
}
}
- private Object getSynchronizationEvents() {
+ private Object getSynchronizationEvents(InternalCache cache) {
List<Map<String, GatewayQueueEvent>> results = new ArrayList<>();
// Get the region
- GemFireCacheImpl gfci = (GemFireCacheImpl) getCache();
- LocalRegion region = (LocalRegion) gfci.getRegion(this.regionPath);
+ LocalRegion region = (LocalRegion) cache.getRegion(this.regionPath);
// Add the appropriate GatewaySenderEventImpl from each GatewaySender for each entry
Set<String> allGatewaySenderIds = region.getAllGatewaySenderIds();
- for (GatewaySender sender : gfci.getAllGatewaySenders()) {
+ for (GatewaySender sender : cache.getAllGatewaySenders()) {
if (allGatewaySenderIds.contains(sender.getId())) {
for (GatewaySenderQueueEntrySynchronizationEntry entry : this.entriesToSynchronize) {
Map<String, GatewayQueueEvent> resultForOneEntry = new HashMap<>();
@@ -243,10 +242,6 @@ public class GatewaySenderQueueEntrySynchronizationOperation {
return results;
}
- private Cache getCache() {
- return CacheFactory.getAnyInstance();
- }
-
@Override
public int getDSFID() {
return GATEWAY_SENDER_QUEUE_ENTRY_SYNCHRONIZATION_MESSAGE;
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
index 35f3b20..1b26b60 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueue.java
@@ -659,9 +659,10 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
boolean putDone = false;
// Can this region ever be null? Should we work with regionName and not with region
// instance.
- // It can't be as put is happeing on the region and its still under process
+ // It can't be as put is happening on the region and its still under process
GatewaySenderEventImpl value = (GatewaySenderEventImpl) object;
- boolean isDREvent = isDREvent(value);
+
+ boolean isDREvent = isDREvent(sender.getCache(), value);
Region region = value.getRegion();
String regionPath = null;
@@ -922,8 +923,9 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
return prQ;
}
- private boolean isDREvent(GatewaySenderEventImpl event) {
- return (event.getRegion() instanceof DistributedRegion) ? true : false;
+ boolean isDREvent(InternalCache cache, GatewaySenderEventImpl event) {
+ Region region = cache.getRegion(event.getRegionPath());
+ return region instanceof DistributedRegion;
}
/**
@@ -1001,7 +1003,7 @@ public class ParallelGatewaySenderQueue implements RegionQueue {
int bucketId = -1;
Object key = null;
if (event.getRegion() != null) {
- if (isDREvent(event)) {
+ if (isDREvent(sender.getCache(), event)) {
prQ = this.userRegionNameToshadowPRMap.get(event.getRegion().getFullPath());
bucketId = event.getEventId().getBucketID();
key = event.getEventId();
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java
new file mode 100644
index 0000000..e9f0466
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderQueueEntrySynchronizationOperationTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.GemFireCacheImpl;
+import org.apache.geode.internal.cache.InitialImageOperation;
+import org.apache.geode.internal.cache.InternalRegion;
+
+public class GatewaySenderQueueEntrySynchronizationOperationTest {
+ private DistributionManager distributionManager;
+ private InternalDistributedMember recipient;
+ private GatewaySenderQueueEntrySynchronizationOperation operation;
+ private InternalRegion region;
+ private GemFireCacheImpl cache;
+
+ @Before
+ public void setup() {
+ distributionManager = mock(DistributionManager.class, RETURNS_DEEP_STUBS);
+ recipient = mock(InternalDistributedMember.class);
+ region = mock(InternalRegion.class);
+ cache = mock(GemFireCacheImpl.class);
+ }
+
+ @Test
+ public void ReplyProcessorGetCacheDelegateToDistributionManager() {
+ operation = mock(GatewaySenderQueueEntrySynchronizationOperation.class);
+ GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor processor =
+ new GatewaySenderQueueEntrySynchronizationOperation.GatewaySenderQueueEntrySynchronizationReplyProcessor(
+ distributionManager, recipient, operation);
+ when(distributionManager.getCache()).thenReturn(cache);
+
+ assertThat(processor.getCache()).isEqualTo(cache);
+ }
+
+ @Test
+ public void GatewaySenderQueueEntrySynchronizationOperationGetCacheDelegateToDistributionManager() {
+ InitialImageOperation.Entry entry = mock(InitialImageOperation.Entry.class);
+ List<InitialImageOperation.Entry> list = new ArrayList<>();
+ list.add(entry);
+ operation = new GatewaySenderQueueEntrySynchronizationOperation(recipient, region, list);
+ when(region.getDistributionManager()).thenReturn(distributionManager);
+ when(distributionManager.getCache()).thenReturn(cache);
+
+ assertThat(operation.getCache()).isEqualTo(cache);
+ }
+}
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
index d03a15c..88c3e45 100644
--- a/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderQueueJUnitTest.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.wan.parallel;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
@@ -39,6 +40,7 @@ import org.apache.geode.cache.PartitionAttributesFactory;
import org.apache.geode.cache.Region;
import org.apache.geode.internal.cache.AbstractBucketRegionQueue;
import org.apache.geode.internal.cache.BucketRegionQueue;
+import org.apache.geode.internal.cache.DistributedRegion;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.PartitionedRegion;
import org.apache.geode.internal.cache.PartitionedRegionDataStore;
@@ -155,6 +157,32 @@ public class ParallelGatewaySenderQueueJUnitTest {
assertEquals(3, queue.localSize());
}
+ @Test
+ public void isDREventReturnsTrueForDistributedRegionEvent() {
+ String regionPath = "regionPath";
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ when(event.getRegionPath()).thenReturn(regionPath);
+ DistributedRegion region = mock(DistributedRegion.class);
+ when(cache.getRegion(regionPath)).thenReturn(region);
+ ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
+ when(queue.isDREvent(cache, event)).thenCallRealMethod();
+
+ assertThat(queue.isDREvent(cache, event)).isTrue();
+ }
+
+ @Test
+ public void isDREventReturnsFalseForPartitionedRegionEvent() {
+ String regionPath = "regionPath";
+ GatewaySenderEventImpl event = mock(GatewaySenderEventImpl.class);
+ when(event.getRegionPath()).thenReturn(regionPath);
+ PartitionedRegion region = mock(PartitionedRegion.class);
+ when(cache.getRegion(regionPath)).thenReturn(region);
+ ParallelGatewaySenderQueue queue = mock(ParallelGatewaySenderQueue.class);
+ when(queue.isDREvent(cache, event)).thenCallRealMethod();
+
+ assertThat(queue.isDREvent(cache, event)).isFalse();
+ }
+
private PartitionedRegion mockPR(String name) {
PartitionedRegion region = mock(PartitionedRegion.class);
when(region.getFullPath()).thenReturn(name);