You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by es...@apache.org on 2018/11/15 22:45:40 UTC
[geode] branch feature/GEODE-6061 updated: GEODE-6061: Add more
test coverage on function with transaction.
This is an automated email from the ASF dual-hosted git repository.
eshu11 pushed a commit to branch feature/GEODE-6061
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/feature/GEODE-6061 by this push:
new fa28a1f GEODE-6061: Add more test coverage on function with transaction.
fa28a1f is described below
commit fa28a1f2df3def7223ca2aa9ca9547456e24ae63
Author: eshu <es...@pivotal.io>
AuthorDate: Thu Nov 15 14:44:41 2018 -0800
GEODE-6061: Add more test coverage on function with transaction.
---
...ionExecutionWithTransactionDistributedTest.java | 764 +++++++++++++++++++++
1 file changed, 764 insertions(+)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java
new file mode 100644
index 0000000..cfac74e
--- /dev/null
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionWithTransactionDistributedTest.java
@@ -0,0 +1,764 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.execute;
+
+import static org.apache.geode.test.dunit.VM.getHostName;
+import static org.apache.geode.test.dunit.VM.getVM;
+import static org.assertj.core.api.Assertions.catchThrowable;
+import static org.assertj.core.api.Java6Assertions.assertThat;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import org.apache.geode.DataSerializable;
+import org.apache.geode.cache.CacheTransactionManager;
+import org.apache.geode.cache.PartitionAttributesFactory;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.TransactionDataRebalancedException;
+import org.apache.geode.cache.TransactionException;
+import org.apache.geode.cache.client.ClientRegionFactory;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.cache.client.PoolFactory;
+import org.apache.geode.cache.client.PoolManager;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.tier.sockets.CacheServerTestUtil;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.CacheRule;
+import org.apache.geode.test.dunit.rules.ClientCacheRule;
+import org.apache.geode.test.dunit.rules.DistributedDiskDirRule;
+import org.apache.geode.test.dunit.rules.DistributedRule;
+import org.apache.geode.test.junit.rules.serializable.SerializableTestName;
+
+public class FunctionExecutionWithTransactionDistributedTest implements
+ Serializable {
+
+ private String hostName;
+ private String uniqueName;
+ private String regionName;
+ private String replicateRegionName;
+ private VM server1;
+ private VM server2;
+ private VM server3;
+ private VM accessor;
+ private int port1;
+ private int port2;
+ private int port3;
+ private DistributedMember distributedMember;
+ private DistributedMember distributedMember2;
+ private transient PoolImpl pool;
+
+ private static final int KEY1 = 1;
+ private static final int KEY2 = 2;
+ private static final int KEY3 = 3;
+ private static final int KEY4 = 4;
+ private final String value = "value";
+
+ private enum Type {
+ ON_REGION, ON_MEMBER
+ }
+
+ @Rule
+ public DistributedRule distributedRule = new DistributedRule();
+
+ @Rule
+ public DistributedDiskDirRule distributedDiskDir = new DistributedDiskDirRule();
+
+ @Rule
+ public CacheRule cacheRule = new CacheRule();
+
+ @Rule
+ public ClientCacheRule clientCacheRule = new ClientCacheRule();
+
+ @Rule
+ public SerializableTestName testName = new SerializableTestName();
+
+ @Before
+ public void setup() throws Exception {
+ accessor = getVM(0);
+ server1 = getVM(1);
+ server2 = getVM(2);
+ server3 = getVM(3);
+
+ hostName = getHostName();
+ uniqueName = getClass().getSimpleName() + "_" + testName.getMethodName();
+ regionName = uniqueName + "_region";
+ replicateRegionName = uniqueName + "_replicateRegion";
+ }
+
+ @Test
+ public void clientCanRollbackTransactionWithMultipleFunctions() {
+ port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+ port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+ port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+ createClientRegions(true, port1, port2, port3);
+
+ doMultipleFunctionsOnClient();
+
+ verifyDataOnServers();
+ }
+
+ private void doMultipleFunctionsOnClient() {
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ Region replicateRegion = clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ doPuts(region);
+ doPuts(replicateRegion);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+ }
+
+ @Test
+ public void clientCanRollbackTransactionWithOnReplicateRegionFunction() {
+ port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+ port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+ port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+ createClientRegions(true, port1, port2, port3);
+
+ Region replicateRegion = clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ doPuts(replicateRegion);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+
+ verifyReplicateRegionDataOnServers();
+ }
+
+ private void verifyReplicateRegionDataOnServers() {
+ server1.invoke(() -> verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+ server2.invoke(() -> verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+ server3.invoke(() -> verifyData(cacheRule.getCache().getRegion(replicateRegionName)));
+ }
+
+ @Test
+ public void clientConnectToAccessorCanRollbackTransactionWithMultipleFunctions() {
+ port1 = accessor.invoke(() -> createServerRegions(1, 3, true));
+ setupServerRegions();
+ createClientRegions(true, port1);
+
+ doMultipleFunctionsOnClient();
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void clientFunctionOnPartitionedRegionWithTransactionFailsIfWithoutFilter() {
+ accessor.invoke(() -> createServerRegions(1, 3, true));
+ port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+ server2.invoke(() -> createServerRegions(1, 3, false));
+ server3.invoke(() -> createServerRegions(1, 3, false));
+ createClientRegions(true, port1);
+
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ doPuts(region);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION, false));
+ assertThat(caughtException).isInstanceOf(FunctionException.class);
+ assertThat(caughtException.getCause()).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+ txManager.rollback();
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ private void verifyPartitionedRegionOnServers() {
+ server1.invoke(() -> verifyData(cacheRule.getCache().getRegion(regionName)));
+ server2.invoke(() -> verifyData(cacheRule.getCache().getRegion(regionName)));
+ server3.invoke(() -> verifyData(cacheRule.getCache().getRegion(regionName)));
+ }
+
+ @Test
+ public void clientCanRollbackTransactionsWithMultipleColocatedKeysFilter() {
+ port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+ port2 = server2.invoke(() -> createServerRegions(1, 3, false));
+ port3 = server3.invoke(() -> createServerRegions(1, 3, false));
+ createClientRegions(true, port1, port2, port3);
+
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ Region replicateRegion = clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ doPuts(region);
+ doPuts(replicateRegion);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true, true, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void clientFunctionWithTransactionsFailsIfWithMultipleNonColocatedKeysFilter() {
+ port1 = accessor.invoke(() -> createServerRegions(1, 3, true));
+ setupServerRegions();
+ createClientRegions(true, port1);
+
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ doPuts(region);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION, true, true, false));
+ assertThat(caughtException).isInstanceOf(FunctionException.class);
+ assertThat(caughtException.getCause()).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+
+ txManager.rollback();
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void clientCanRollbackTransactionWithNonColocatedFunctions() {
+ port1 = server1.invoke(() -> createServerRegions(1, 3, false));
+ server2.invoke(() -> createServerRegions(1, 3, false));
+ server3.invoke(() -> createServerRegions(1, 3, false));
+ createClientRegions(true, port1);
+
+ Region region = clientCacheRule.getClientCache().getRegion(regionName);
+ Region replicateRegion = clientCacheRule.getClientCache().getRegion(replicateRegionName);
+ doPuts(region);
+ doPuts(replicateRegion);
+ CacheTransactionManager txManager =
+ clientCacheRule.getClientCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+
+ Throwable caughtException = catchThrowable(() -> doFunction2(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION));
+ assertThat(caughtException).isInstanceOf(FunctionException.class);
+ assertThat(caughtException.getCause()).isInstanceOf(TransactionDataRebalancedException.class)
+ .hasMessageContaining("Function execution is not colocated with transaction.");
+
+ txManager.rollback();
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void memberCanRollbackTransactionWithMultipleOnRegionFunctions() {
+ setupServerRegions();
+ server2.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ Region replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+ });
+
+ verifyDataOnServers();
+ }
+
+ private void verifyDataOnServers() {
+ server1.invoke(() -> verifyData());
+ server2.invoke(() -> verifyData());
+ server3.invoke(() -> verifyData());
+ }
+
+ private void setupServerRegions() {
+ setupServerRegions(false);
+ }
+
+ private void setupServerRegions(boolean doPut) {
+ int redundantCopies = 1;
+ server1.invoke(() -> {
+ createServerRegions(redundantCopies, 3, false);
+ if (doPut) {
+ cacheRule.getCache().getRegion(regionName).put(KEY1, value + KEY1);
+ }
+ });
+ server2.invoke(() -> {
+ createServerRegions(redundantCopies, 3, false);
+ if (doPut) {
+ cacheRule.getCache().getRegion(regionName).put(KEY2, value + KEY2);
+ }
+ });
+ server3.invoke(() -> {
+ createServerRegions(redundantCopies, 3, false);
+ if (doPut) {
+ cacheRule.getCache().getRegion(regionName).put(KEY3, value + KEY3);
+ }
+ });
+ }
+
+ @Test
+ public void memberCanRollbackTransactionWithOnReplicateRegionFunction() {
+ setupServerRegions();
+ server2.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ doPuts(replicateRegion);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+ });
+
+ verifyReplicateRegionDataOnServers();
+ }
+
+ @Test
+ public void accessorCanRollbackTransactionWithMultipleFunctions() {
+ accessor.invoke(() -> createServerRegions(1, 3, true));
+ setupServerRegions();
+ accessor.invoke(() -> doPuts());
+
+ accessor.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ Region replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+ });
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void memberFunctionOnPartitionedRegionWithTransactionFailsIfWithoutFilter() {
+ setupServerRegions();
+ server3.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ doPuts(region);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION, false));
+ assertThat(caughtException).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+ txManager.rollback();
+ });
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void memberCanRollbackTransactionsWithMultipleColocatedKeysFilter() {
+ setupServerRegions();
+ server2.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ Region replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true, true, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ txManager.rollback();
+ });
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void memberFunctionWithTransactionsFailsIfWithMultipleNonColocatedKeysFilter() {
+ setupServerRegions();
+ server1.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION, true, true, false));
+ assertThat(caughtException).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+
+ txManager.rollback();
+ });
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void memberTransactionFailsWithNonColocatedFunctions() {
+ setupServerRegions();
+ server1.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ Region replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+
+ Throwable caughtException = catchThrowable(() -> doFunction2(region,
+ new MyPartitionRegionFunction(), Type.ON_REGION));
+ assertThat(caughtException).isInstanceOf(TransactionDataRebalancedException.class)
+ .hasMessageContaining("Function execution is not colocated with transaction.");
+
+ txManager.rollback();
+ });
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void transactionFailsIfIsOnMembersFunction() {
+ setupServerRegions();
+ server1.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_MEMBER, false));
+ assertThat(caughtException).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+
+ txManager.rollback();
+ });
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void transactionFailsIfIsOnMultipleMembersFunction() {
+ setupServerRegions();
+ distributedMember = server1.invoke(() -> {
+ return cacheRule.getCache().getDistributedSystem().getDistributedMember();
+ });
+ distributedMember2 = server2.invoke(() -> {
+ return cacheRule.getCache().getDistributedSystem().getDistributedMember();
+ });
+ server1.invoke(() -> doPuts());
+
+ server1.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ Throwable caughtException = catchThrowable(() -> doFunction(region,
+ new MyPartitionRegionFunction(), Type.ON_MEMBER, false));
+ assertThat(caughtException).isInstanceOf(TransactionException.class)
+ .hasMessage("Function inside a transaction cannot execute on more than one node");
+
+ txManager.rollback();
+ });
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void accessorCanRollbackTransactionWithOnMemberFunctions() {
+ accessor.invoke(() -> createServerRegions(1, 3, true));
+ setupServerRegions(true);
+ distributedMember = server1.invoke(() -> {
+ return cacheRule.getCache().getDistributedSystem().getDistributedMember();
+ });
+ server2.invoke(() -> doPuts());
+
+ accessor.invoke(() -> {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_MEMBER, false);
+ txManager.rollback();
+ });
+
+ verifyPartitionedRegionOnServers();
+ }
+
+ @Test
+ public void memberCanRollbackTransactionWithOnRegionAndOnMemberFunctions() {
+ setupServerRegions(true);
+ distributedMember = server1.invoke(() -> {
+ return cacheRule.getCache().getDistributedSystem().getDistributedMember();
+ });
+ server2.invoke(() -> doPuts());
+
+ server1.invoke(() -> doOnRegionAndOnMemberFunction());
+
+ verifyDataOnServers();
+ }
+
+ @Test
+ public void accessorCanRollbackTransactionWithOnRegionAndOnMemberFunctions() {
+ accessor.invoke(() -> createServerRegions(1, 3, true));
+ setupServerRegions(true);
+ distributedMember = server1.invoke(() -> {
+ return cacheRule.getCache().getDistributedSystem().getDistributedMember();
+ });
+ server2.invoke(() -> doPuts());
+
+ accessor.invoke(() -> doOnRegionAndOnMemberFunction());
+
+ verifyDataOnServers();
+ }
+
+ private void doOnRegionAndOnMemberFunction() {
+ Region region = cacheRule.getCache().getRegion(regionName);
+ Region replicateRegion = null;
+ replicateRegion = cacheRule.getCache().getRegion(replicateRegionName);
+ CacheTransactionManager txManager =
+ cacheRule.getCache().getCacheTransactionManager();
+ txManager.begin();
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_REGION, true);
+ doFunction(replicateRegion, new MyReplicateRegionFunction(), Type.ON_REGION, false);
+ doFunction(region, new MyPartitionRegionFunction(), Type.ON_MEMBER, false);
+ txManager.rollback();
+ }
+
+ private int createServerRegions(int redundantCopies, int totalNumOfBuckets,
+ boolean isAccessor) throws Exception {
+ PartitionAttributesFactory factory = new PartitionAttributesFactory();
+ factory.setRedundantCopies(redundantCopies).setTotalNumBuckets(totalNumOfBuckets);
+ if (isAccessor) {
+ factory.setLocalMaxMemory(0);
+ }
+
+ cacheRule.getOrCreateCache().createRegionFactory(RegionShortcut.PARTITION)
+ .setPartitionAttributes(factory.create()).create(regionName);
+ cacheRule.getCache().createRegionFactory(RegionShortcut.REPLICATE)
+ .create(replicateRegionName);
+ CacheServer server = cacheRule.getCache().addCacheServer();
+ server.setPort(0);
+ server.start();
+ return server.getPort();
+ }
+
+ private void createClientRegions(boolean singleHopEnabled, int... ports) {
+ clientCacheRule.createClientCache();
+
+ CacheServerTestUtil.disableShufflingOfEndpoints();
+ try {
+ pool = getPool(singleHopEnabled, ports);
+ } finally {
+ CacheServerTestUtil.enableShufflingOfEndpoints();
+ }
+
+ ClientRegionFactory crf =
+ clientCacheRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.LOCAL);
+ crf.setPoolName(pool.getName());
+ crf.create(regionName);
+ crf.create(replicateRegionName);
+
+ if (ports.length > 1) {
+ pool.acquireConnection(new ServerLocation(hostName, ports[0]));
+ }
+ }
+
+ private PoolImpl getPool(boolean singleHopEnabled, int... ports) {
+ PoolFactory factory = PoolManager.createFactory();
+ for (int port : ports) {
+ factory.addServer(hostName, port);
+ }
+ factory.setPRSingleHopEnabled(singleHopEnabled);
+ return (PoolImpl) factory.create(uniqueName);
+ }
+
+ private void doPuts() {
+ doPuts(cacheRule.getCache().getRegion(regionName));
+ doPuts(cacheRule.getCache().getRegion(replicateRegionName));
+ }
+
+ private void doPuts(Region region) {
+ region.put(KEY1, value + KEY1);
+ region.put(KEY2, value + KEY2);
+ region.put(KEY3, value + KEY3);
+ region.put(KEY4, value + KEY4);
+ }
+
+ private void verifyData() {
+ verifyData(cacheRule.getCache().getRegion(regionName));
+ verifyData(cacheRule.getCache().getRegion(replicateRegionName));
+ }
+
+ private void verifyData(Region region) {
+ assertThat(region.get(KEY1)).isEqualTo(value + KEY1);
+ assertThat(region.get(KEY2)).isEqualTo(value + KEY2);
+ assertThat(region.get(KEY3)).isEqualTo(value + KEY3);
+ assertThat(region.get(KEY4)).isEqualTo(value + KEY4);
+ }
+
+ private void doFunction(Region region, Function function, Type type, boolean withFilter) {
+ doFunction(region, function, type, withFilter, false);
+ }
+
+ private void doFunction(Region region, Function function, Type type, boolean withFilter,
+ boolean withMultipleKeys) {
+ doFunction(region, function, type, withFilter, withMultipleKeys, false);
+ }
+
+ private void doFunction(Region region, Function function, Type type, boolean withFilter,
+ boolean withMultipleKeys, boolean areColocatedKeys) {
+ Execution execution;
+ Set keySet = new HashSet();
+ keySet.add(KEY1);
+ if (withMultipleKeys) {
+ if (areColocatedKeys) {
+ keySet.add(KEY4);
+ } else {
+ keySet.add(KEY2);
+ }
+ }
+ switch (type) {
+ case ON_MEMBER:
+ if (distributedMember != null) {
+ if (distributedMember2 != null) {
+ Set membersSet = new HashSet();
+ membersSet.add(distributedMember);
+ membersSet.add(distributedMember2);
+ execution = FunctionService.onMembers(membersSet);
+ } else {
+ execution = FunctionService.onMember(distributedMember).setArguments(regionName);
+ }
+ } else {
+ execution = FunctionService.onMembers();
+ }
+ break;
+ case ON_REGION:
+ execution = FunctionService.onRegion(region);
+ if (withFilter) {
+ execution = execution.withFilter(keySet);
+ }
+ break;
+ default:
+ throw new RuntimeException("unexpected type");
+ }
+
+ ResultCollector resultCollector = execution.execute(function);
+ resultCollector.getResult();
+ }
+
+ private void doFunction2(Region region, Function function, Type type) {
+ Execution execution;
+ Set keySet = new HashSet();
+ keySet.add(KEY2);
+ switch (type) {
+ case ON_MEMBER:
+ execution = FunctionService.onMembers();
+ break;
+ case ON_REGION:
+ execution = FunctionService.onRegion(region);
+ execution = execution.withFilter(keySet);
+ break;
+ default:
+ throw new RuntimeException("unexpected type");
+ }
+
+ ResultCollector resultCollector = execution.execute(function);
+ resultCollector.getResult();
+ }
+
+ public static class MyPartitionRegionFunction implements Function, DataSerializable {
+ @Override
+ public void execute(FunctionContext context) {
+ if (context instanceof RegionFunctionContext) {
+ PartitionedRegion region =
+ (PartitionedRegion) ((RegionFunctionContext) context).getDataSet();
+ Set keySet = ((RegionFunctionContext) context).getFilter();
+ Iterator iterator = keySet.iterator();
+ while (iterator.hasNext()) {
+ Object key = iterator.next();
+ region.destroy(key);
+ }
+ } else if (context instanceof FunctionContextImpl) {
+ String regionName = context.getArguments().toString();
+ Region region = context.getCache().getRegion(regionName);
+ region.destroy(KEY4);
+ }
+ context.getResultSender().lastResult(Boolean.TRUE);
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+
+ }
+
+ @Override
+ public boolean optimizeForWrite() {
+ return true;
+ }
+ }
+
+ public static class MyReplicateRegionFunction implements Function, DataSerializable {
+ @Override
+ public void execute(FunctionContext context) {
+ if (context instanceof RegionFunctionContext) {
+ Region region = ((RegionFunctionContext) context).getDataSet();
+ region.destroy(KEY2);
+ region.destroy(KEY3);
+ context.getResultSender().lastResult(Boolean.TRUE);
+ }
+ }
+
+ @Override
+ public void toData(DataOutput out) throws IOException {
+
+ }
+
+ @Override
+ public void fromData(DataInput in) throws IOException, ClassNotFoundException {
+
+ }
+ }
+}