You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by ag...@apache.org on 2018/08/02 20:31:33 UTC
[geode] branch develop updated: GEODE-5513: Changes to build
register interest initial snapshot from primary bucket (#2246)
This is an automated email from the ASF dual-hosted git repository.
agingade pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 4abb1d0 GEODE-5513: Changes to build register interest initial snapshot from primary bucket (#2246)
4abb1d0 is described below
commit 4abb1d024977fa62bb617facd40bc899f9ebf1bc
Author: agingade <ag...@pivotal.io>
AuthorDate: Thu Aug 2 13:31:28 2018 -0700
GEODE-5513: Changes to build register interest initial snapshot from primary bucket (#2246)
* Changes to build register interest initial snapshot from primary bucket
---
.../geode/internal/cache/PartitionedRegion.java | 22 ++-
.../internal/cache/PartitionedRegionTest.java | 178 +++++++++++++++++++++
2 files changed, 195 insertions(+), 5 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
index da3fcad..77175a3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/PartitionedRegion.java
@@ -3364,7 +3364,7 @@ public class PartitionedRegion extends LocalRegion
}
allowRetry = false;
} else {
- targetNode = getNodeForBucketReadOrLoad(bucketId);
+ targetNode = getBucketNodeForReadOrWrite(bucketId, clientEvent);
allowRetry = true;
}
if (targetNode == null) {
@@ -3382,6 +3382,18 @@ public class PartitionedRegion extends LocalRegion
return obj;
}
+ InternalDistributedMember getBucketNodeForReadOrWrite(int bucketId,
+ EntryEventImpl clientEvent) {
+ InternalDistributedMember targetNode;
+ if (clientEvent != null && clientEvent.getOperation() != null
+ && clientEvent.getOperation().isGetForRegisterInterest()) {
+ targetNode = getNodeForBucketWrite(bucketId, null);
+ } else {
+ targetNode = getNodeForBucketReadOrLoad(bucketId);
+ }
+ return targetNode;
+ }
+
/**
* Execute the provided named function in all locations that contain the given keys. So function
* can be executed on just one fabric node, executed in parallel on a subset of nodes in parallel
@@ -4434,11 +4446,11 @@ public class PartitionedRegion extends LocalRegion
}
}
- private void updateNodeToBucketMap(
+ void updateNodeToBucketMap(
HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets,
HashMap<Integer, HashSet> bucketKeys) {
for (int id : bucketKeys.keySet()) {
- InternalDistributedMember node = getOrCreateNodeForBucketRead(id);
+ InternalDistributedMember node = getOrCreateNodeForBucketWrite(id, null);
if (nodeToBuckets.containsKey(node)) {
nodeToBuckets.get(node).put(id, bucketKeys.get(id));
} else {
@@ -4594,10 +4606,10 @@ public class PartitionedRegion extends LocalRegion
}
}
- private void updateNodeToBucketMap(
+ void updateNodeToBucketMap(
HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets, Set<Integer> buckets) {
for (int id : buckets) {
- InternalDistributedMember node = getOrCreateNodeForBucketRead(id);
+ InternalDistributedMember node = getOrCreateNodeForBucketWrite(id, null);
if (nodeToBuckets.containsKey(node)) {
nodeToBuckets.get(node).add(id);
} else {
diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
new file mode 100644
index 0000000..2e1db35
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/internal/cache/PartitionedRegionTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.AttributesFactory;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.control.InternalResourceManager;
+import org.apache.geode.test.fake.Fakes;
+
+public class PartitionedRegionTest {
+
+ String regionName = "prTestRegion";
+
+ PartitionedRegion partitionedRegion;
+
+
+ @Before
+ public void setup() {
+ InternalCache internalCache = Fakes.cache();
+ InternalResourceManager resourceManager =
+ mock(InternalResourceManager.class, RETURNS_DEEP_STUBS);
+ when(internalCache.getInternalResourceManager()).thenReturn(resourceManager);
+ AttributesFactory attributesFactory = new AttributesFactory();
+ attributesFactory.setPartitionAttributes(
+ new PartitionAttributesFactory().setTotalNumBuckets(1).setRedundantCopies(1).create());
+ partitionedRegion = new PartitionedRegion(regionName, attributesFactory.create(),
+ null, internalCache, mock(InternalRegionArguments.class));
+
+ }
+
+ @Test
+ public void getBucketNodeForReadOrWriteReturnsPrimaryNodeForRegisterInterest() throws Exception {
+ int bucketId = 0;
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ EntryEventImpl clientEvent = mock(EntryEventImpl.class);
+ when(clientEvent.getOperation()).thenReturn(Operation.GET_FOR_REGISTER_INTEREST);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
+
+ InternalDistributedMember memberForRegisterInterestRead =
+ spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
+
+ assertThat(memberForRegisterInterestRead).isSameAs(primaryMember);
+ verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), any());
+ }
+
+ @Test
+ public void getBucketNodeForReadOrWriteReturnsSecondaryNodeForNonRegisterInterest()
+ throws Exception {
+ int bucketId = 0;
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ EntryEventImpl clientEvent = mock(EntryEventImpl.class);
+ when(clientEvent.getOperation()).thenReturn(Operation.GET);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
+
+ InternalDistributedMember memberForRegisterInterestRead =
+ spyPR.getBucketNodeForReadOrWrite(bucketId, clientEvent);
+
+ assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
+ verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
+ }
+
+ @Test
+ public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventIsNotPresent()
+ throws Exception {
+ int bucketId = 0;
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
+
+ InternalDistributedMember memberForRegisterInterestRead =
+ spyPR.getBucketNodeForReadOrWrite(bucketId, null);
+
+ assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
+ verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
+ }
+
+ @Test
+ public void getBucketNodeForReadOrWriteReturnsSecondaryNodeWhenClientEventOperationIsNotPresent()
+ throws Exception {
+ int bucketId = 0;
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ EntryEventImpl clientEvent = mock(EntryEventImpl.class);
+ when(clientEvent.getOperation()).thenReturn(null);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(eq(bucketId), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(eq(bucketId));
+
+ InternalDistributedMember memberForRegisterInterestRead =
+ spyPR.getBucketNodeForReadOrWrite(bucketId, null);
+
+ assertThat(memberForRegisterInterestRead).isSameAs(secondaryMember);
+ verify(spyPR, times(1)).getNodeForBucketRead(anyInt());
+ }
+
+ @Test
+ public void updateBucketMapsForInterestRegistrationWithSetOfKeysFetchesPrimaryBucketsForRead() {
+ Integer[] bucketIds = new Integer[] {0, 1};
+
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
+ HashMap<InternalDistributedMember, HashSet<Integer>> nodeToBuckets =
+ new HashMap<InternalDistributedMember, HashSet<Integer>>();
+ Set buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
+
+ spyPR.updateNodeToBucketMap(nodeToBuckets, buckets);
+
+ verify(spyPR, times(2)).getNodeForBucketWrite(anyInt(), isNull());
+ }
+
+ @Test
+ public void updateBucketMapsForInterestRegistrationWithAllKeysFetchesPrimaryBucketsForRead() {
+ Integer[] bucketIds = new Integer[] {0, 1};
+
+ InternalDistributedMember primaryMember = mock(InternalDistributedMember.class);
+ InternalDistributedMember secondaryMember = mock(InternalDistributedMember.class);
+ PartitionedRegion spyPR = spy(partitionedRegion);
+ doReturn(primaryMember).when(spyPR).getNodeForBucketWrite(anyInt(), isNull());
+ doReturn(secondaryMember).when(spyPR).getNodeForBucketRead(anyInt());
+ HashMap<InternalDistributedMember, HashMap<Integer, HashSet>> nodeToBuckets =
+ new HashMap<InternalDistributedMember, HashMap<Integer, HashSet>>();
+ HashSet buckets = Arrays.stream(bucketIds).collect(Collectors.toCollection(HashSet::new));
+ HashMap<Integer, HashSet> bucketKeys = new HashMap<>();
+ bucketKeys.put(Integer.valueOf(0), buckets);
+
+ spyPR.updateNodeToBucketMap(nodeToBuckets, bucketKeys);
+
+ verify(spyPR, times(1)).getNodeForBucketWrite(anyInt(), isNull());
+ }
+
+}