You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by jb...@apache.org on 2019/06/22 01:56:32 UTC
[geode] branch develop updated: GEODE-6798: Refactoring of client
function execution logic (#3710)
This is an automated email from the ASF dual-hosted git repository.
jbarrett pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 18530da GEODE-6798: Refactoring of client function execution logic (#3710)
18530da is described below
commit 18530da5ae33fc2c922bb4a2aeb5e00a57eaa31e
Author: albertogpz <al...@est.tech>
AuthorDate: Sat Jun 22 03:56:19 2019 +0200
GEODE-6798: Refactoring of client function execution logic (#3710)
* Some unit tests for the ExecuteRegionFunctionOpImplTest constructors
* Do not allocate failedNodes Collection unless necessary
---
.../client/internal/ExecuteRegionFunctionOp.java | 335 +++++++--------------
.../internal/ExecuteRegionFunctionOpImplTest.java | 128 ++++++++
2 files changed, 233 insertions(+), 230 deletions(-)
diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
index 6c95b02..641fd26 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOp.java
@@ -18,6 +18,7 @@ package org.apache.geode.cache.client.internal;
import static org.apache.geode.internal.cache.execute.AbstractExecution.DEFAULT_CLIENT_FUNCTION_TIMEOUT;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
@@ -63,49 +64,32 @@ public class ExecuteRegionFunctionOp {
// no instances allowed
}
- /**
- * Does a execute Function on a server using connections from the given pool to communicate with
- * the server.
- *
- * @param pool the pool to use to communicate with the server.
- * @param region the name of the region to do the put on
- * @param function to be executed
- * @param serverRegionExecutor which will return argument and filter
- * @param resultCollector is used to collect the results from the Server
- * @param timeoutMs timeout in milliseconds
- */
- public static void execute(ExecutablePool pool, String region, Function function,
+ private static void execute(ExecutablePool pool, String region, String function,
ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
- byte hasResult, int mRetryAttempts, final int timeoutMs) {
-
- ExecuteRegionFunctionOpImpl op =
- new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
- resultCollector, hasResult, new HashSet<>(), timeoutMs);
- boolean reexecute = false;
- boolean reexecuteForServ = false;
- Set<String> failedNodes = new HashSet<>();
- AbstractOp reexecOp;
-
- int maxRetryAttempts = mRetryAttempts;
- if (!function.isHA()) {
+ byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite,
+ ExecuteRegionFunctionOpImpl op, boolean isReexecute, Set<String> failedNodes) {
+
+ if (!isHA) {
maxRetryAttempts = 0;
}
do {
try {
- if (reexecuteForServ) {
- reexecOp = new ExecuteRegionFunctionOpImpl(op,
+ if (isReexecute) {
+ failedNodes = ensureMutability(failedNodes);
+ op = new ExecuteRegionFunctionOpImpl(op,
(byte) 1/* isReExecute */, failedNodes);
- pool.execute(reexecOp, 0);
- } else {
- pool.execute(op, 0);
}
- reexecute = false;
- reexecuteForServ = false;
+ pool.execute(op, 0);
+ return;
} catch (InternalFunctionInvocationTargetException e) {
- reexecute = true;
resultCollector.clearResults();
+ if (!isHA) {
+ return;
+ }
+ isReexecute = true;
Set<String> failedNodesIds = e.getFailedNodeSet();
+ failedNodes = ensureMutability(failedNodes);
failedNodes.clear();
if (failedNodesIds != null) {
failedNodes.addAll(failedNodesIds);
@@ -125,126 +109,68 @@ public class ExecuteRegionFunctionOp {
throw se;
}
- reexecuteForServ = true;
+ isReexecute = true;
resultCollector.clearResults();
+ failedNodes = ensureMutability(failedNodes);
failedNodes.clear();
}
- } while (reexecuteForServ);
+ } while (true);
+ }
- if (reexecute && function.isHA()) {
- ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor,
- resultCollector, hasResult, failedNodes, maxRetryAttempts, timeoutMs);
+ private static Set<String> ensureMutability(final Set<String> failedNodes) {
+ if (failedNodes == Collections.EMPTY_SET) {
+ return new HashSet<>();
}
+ return failedNodes;
}
- public static void execute(ExecutablePool pool, String region, String function,
+ /**
+ * Does a execute Function on a server using connections from the given pool to communicate with
+ * the server.
+ *
+ * @param pool the pool to use to communicate with the server.
+ * @param region the name of the region to do the put on
+ * @param function to be executed
+ * @param serverRegionExecutor which will return argument and filter
+ * @param resultCollector is used to collect the results from the Server
+ * @param hasResult is used to collect the results from the Server
+ * @param maxRetryAttempts Maximum number of retry attempts
+ * @param timeoutMs timeout in milliseconds
+ */
+ static void execute(ExecutablePool pool, String region, Function function,
ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
- byte hasResult, int mRetryAttempts, boolean isHA, boolean optimizeForWrite,
- final int timeoutMs) {
+ byte hasResult, int maxRetryAttempts, final int timeoutMs) {
- ExecuteRegionFunctionOpImpl op =
- new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
- resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs);
- boolean reexecute = false;
- boolean reexecuteForServ = false;
- Set<String> failedNodes = new HashSet<>();
- AbstractOp reexecOp;
+ ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+ serverRegionExecutor, resultCollector, timeoutMs);
- int maxRetryAttempts = mRetryAttempts;
- if (!isHA) {
- maxRetryAttempts = 0;
- }
-
- do {
- try {
- if (reexecuteForServ) {
- reexecOp = new ExecuteRegionFunctionOpImpl(op,
- (byte) 1/* isReExecute */, failedNodes);
- pool.execute(reexecOp, 0);
- } else {
- pool.execute(op, 0);
- }
- reexecute = false;
- reexecuteForServ = false;
- } catch (InternalFunctionInvocationTargetException e) {
- reexecute = true;
- resultCollector.clearResults();
- Set<String> failedNodesIds = e.getFailedNodeSet();
- failedNodes.clear();
- if (failedNodesIds != null) {
- failedNodes.addAll(failedNodesIds);
- }
- } catch (ServerOperationException | NoAvailableServersException failedException) {
- throw failedException;
- } catch (ServerConnectivityException se) {
-
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
- }
+ execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult,
+ maxRetryAttempts, function.isHA(), function.optimizeForWrite(), op, false,
+ Collections.emptySet());
+ }
- if ((maxRetryAttempts--) < 1) {
- throw se;
- }
+ static void execute(ExecutablePool pool, String region, String function,
+ ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
+ byte hasResult, int maxRetryAttempts, boolean isHA, boolean optimizeForWrite,
+ final int timeoutMs) {
- reexecuteForServ = true;
- resultCollector.clearResults();
- failedNodes.clear();
- }
- } while (reexecuteForServ);
+ ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+ serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite,
+ true, timeoutMs);
- if (reexecute && isHA) {
- ExecuteRegionFunctionOp.reexecute(pool, region, function, serverRegionExecutor,
- resultCollector, hasResult, failedNodes, maxRetryAttempts, isHA, optimizeForWrite,
- timeoutMs);
- }
+ execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult,
+ maxRetryAttempts, isHA, optimizeForWrite, op, false, Collections.emptySet());
}
static void reexecute(ExecutablePool pool, String region, Function function,
ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector resultCollector,
byte hasResult, Set<String> failedNodes, int retryAttempts, final int timeoutMs) {
- ExecuteRegionFunctionOpImpl op =
- new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
- resultCollector, hasResult, new HashSet<>(), timeoutMs);
- boolean reexecute = true;
- int maxRetryAttempts = retryAttempts;
-
- do {
- AbstractOp reExecuteOp =
- new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes);
-
- try {
- pool.execute(reExecuteOp, 0);
- reexecute = false;
- } catch (InternalFunctionInvocationTargetException e) {
- resultCollector.clearResults();
- Set<String> failedNodesIds = e.getFailedNodeSet();
- failedNodes.clear();
- if (failedNodesIds != null) {
- failedNodes.addAll(failedNodesIds);
- }
- } catch (ServerOperationException | NoAvailableServersException failedException) {
- throw failedException;
- } catch (ServerConnectivityException se) {
-
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
- }
-
- if ((maxRetryAttempts--) < 1) {
- throw se;
- }
+ ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+ serverRegionExecutor, resultCollector, timeoutMs);
- resultCollector.clearResults();
- failedNodes.clear();
- }
- } while (reexecute);
+ execute(pool, region, function.getId(), serverRegionExecutor, resultCollector, hasResult,
+ retryAttempts, function.isHA(), function.optimizeForWrite(), op, true, failedNodes);
}
static void reexecute(ExecutablePool pool, String region, String function,
@@ -252,45 +178,12 @@ public class ExecuteRegionFunctionOp {
byte hasResult, Set<String> failedNodes, int retryAttempts, boolean isHA,
boolean optimizeForWrite, final int timeoutMs) {
- ExecuteRegionFunctionOpImpl op =
- new ExecuteRegionFunctionOpImpl(region, function, serverRegionExecutor,
- resultCollector, hasResult, new HashSet<>(), isHA, optimizeForWrite, true, timeoutMs);
- boolean reexecute = true;
- int maxRetryAttempts = retryAttempts;
-
- do {
- ExecuteRegionFunctionOpImpl reExecuteOp =
- new ExecuteRegionFunctionOpImpl(op, (byte) 1/* isReExecute */, failedNodes);
-
- try {
- pool.execute(reExecuteOp, 0);
- reexecute = false;
- } catch (InternalFunctionInvocationTargetException e) {
- resultCollector.clearResults();
- Set<String> failedNodesIds = e.getFailedNodeSet();
- failedNodes.clear();
- if (failedNodesIds != null) {
- failedNodes.addAll(failedNodesIds);
- }
- } catch (ServerOperationException | NoAvailableServersException failedException) {
- throw failedException;
- } catch (ServerConnectivityException se) {
+ ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, function,
+ serverRegionExecutor, resultCollector, hasResult, isHA,
+ optimizeForWrite, true, timeoutMs);
- if (maxRetryAttempts == PoolFactory.DEFAULT_RETRY_ATTEMPTS) {
- // If the retryAttempt is set to default(-1). Try it on all servers once.
- // Calculating number of servers when function is re-executed as it involves
- // messaging locator.
- maxRetryAttempts = ((PoolImpl) pool).getConnectionSource().getAllServers().size() - 1;
- }
-
- if ((maxRetryAttempts--) < 1) {
- throw se;
- }
-
- resultCollector.clearResults();
- failedNodes.clear();
- }
- } while (reexecute);
+ execute(pool, region, function, serverRegionExecutor, resultCollector, hasResult,
+ retryAttempts, isHA, optimizeForWrite, op, true, failedNodes);
}
static class ExecuteRegionFunctionOpImpl extends AbstractOpWithTimeout {
@@ -319,29 +212,28 @@ public class ExecuteRegionFunctionOp {
private FunctionException functionException;
+ private static final int PART_COUNT = 8;
- ExecuteRegionFunctionOpImpl(String region, Function function,
- ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult,
- Set<String> removedNodes, final int timeoutMs) {
- super(MessageType.EXECUTE_REGION_FUNCTION,
- 8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs);
+ private static int getMessagePartCount(int filterSize, int removedNodesSize) {
+ return PART_COUNT + filterSize + removedNodesSize;
+ }
+
+ private void fillMessage(String region, Function function, String functionId,
+ ServerRegionFunctionExecutor serverRegionExecutor,
+ Set<String> removedNodes, byte functionState, byte flags) {
Set routingObjects = serverRegionExecutor.getFilter();
Object args = serverRegionExecutor.getArguments();
- byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(),
- function.optimizeForWrite());
MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
-
addBytes(functionState);
getMessage().addStringPart(region, true);
- if (serverRegionExecutor.isFnSerializationReqd()) {
+ if (function != null && serverRegionExecutor.isFnSerializationReqd()) {
getMessage().addStringOrObjPart(function);
} else {
- getMessage().addStringOrObjPart(function.getId());
+ getMessage().addStringOrObjPart(functionId);
}
+
getMessage().addObjPart(args);
getMessage().addObjPart(memberMappedArg);
- executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
- byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
getMessage().addBytesPart(new byte[] {flags});
getMessage().addIntPart(routingObjects.size());
@@ -352,14 +244,27 @@ public class ExecuteRegionFunctionOp {
for (Object nodes : removedNodes) {
getMessage().addStringOrObjPart(nodes);
}
+ }
+ ExecuteRegionFunctionOpImpl(String region, Function function,
+ ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc,
+ final int timeoutMs) {
+ super(MessageType.EXECUTE_REGION_FUNCTION,
+ getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs);
+ executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
+ byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
+ byte functionState = AbstractExecution.getFunctionState(function.isHA(), function.hasResult(),
+ function.optimizeForWrite());
+ failedNodes = Collections.emptySet();
+ fillMessage(region,
+ function, function.getId(),
+ serverRegionExecutor, failedNodes, functionState, flags);
resultCollector = rc;
regionName = region;
this.function = function;
functionId = function.getId();
executor = serverRegionExecutor;
- this.hasResult = functionState;
- failedNodes = removedNodes;
+ hasResult = functionState;
isHA = function.isHA();
}
@@ -378,58 +283,45 @@ public class ExecuteRegionFunctionOp {
isHA = true;
}
- ExecuteRegionFunctionOpImpl(String region, String function,
+ ExecuteRegionFunctionOpImpl(String region, String functionId,
ServerRegionFunctionExecutor serverRegionExecutor, ResultCollector rc, byte hasResult,
- Set<String> removedNodes, boolean isHA, boolean optimizeForWrite,
+ boolean isHA, boolean optimizeForWrite,
boolean calculateFnState, final int timeoutMs) {
super(MessageType.EXECUTE_REGION_FUNCTION,
- 8 + serverRegionExecutor.getFilter().size() + removedNodes.size(), timeoutMs);
- Set routingObjects = serverRegionExecutor.getFilter();
+ getMessagePartCount(serverRegionExecutor.getFilter().size(), 0), timeoutMs);
+
byte functionState = hasResult;
if (calculateFnState) {
functionState = AbstractExecution.getFunctionState(isHA,
hasResult == (byte) 1, optimizeForWrite);
}
- Object args = serverRegionExecutor.getArguments();
- MemberMappedArgument memberMappedArg = serverRegionExecutor.getMemberMappedArgument();
- addBytes(functionState);
- getMessage().addStringPart(region, true);
- getMessage().addStringOrObjPart(function);
- getMessage().addObjPart(args);
- getMessage().addObjPart(memberMappedArg);
executeOnBucketSet = serverRegionExecutor.getExecuteOnBucketSetFlag();
byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
- getMessage().addBytesPart(new byte[] {flags});
- getMessage().addIntPart(routingObjects.size());
- for (Object key : routingObjects) {
- getMessage().addStringOrObjPart(key);
- }
- getMessage().addIntPart(removedNodes.size());
- for (Object nodes : removedNodes) {
- getMessage().addStringOrObjPart(nodes);
- }
+ failedNodes = Collections.emptySet();
+ fillMessage(region, null, functionId, serverRegionExecutor, failedNodes, functionState,
+ flags);
resultCollector = rc;
regionName = region;
- functionId = function;
+ this.functionId = functionId;
executor = serverRegionExecutor;
this.hasResult = functionState;
- failedNodes = removedNodes;
this.isHA = isHA;
}
ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionSingleHopOpImpl newop) {
this(newop.getRegionName(), newop.getFunctionId(), newop.getExecutor(),
- newop.getResultCollector(), newop.getHasResult(), new HashSet<>(), newop.isHA(),
+ newop.getResultCollector(), newop.getHasResult(), newop.isHA(),
newop.optimizeForWrite(), false, newop.getTimeoutMs());
}
ExecuteRegionFunctionOpImpl(ExecuteRegionFunctionOpImpl op, byte isReExecute,
Set<String> removedNodes) {
super(MessageType.EXECUTE_REGION_FUNCTION,
- 8 + op.executor.getFilter().size() + removedNodes.size(), op.getTimeoutMs());
+ getMessagePartCount(op.executor.getFilter().size(), removedNodes.size()),
+ op.getTimeoutMs());
this.isReExecute = isReExecute;
resultCollector = op.resultCollector;
function = op.function;
@@ -445,30 +337,10 @@ public class ExecuteRegionFunctionOp {
resultCollector.clearResults();
}
- Set routingObjects = executor.getFilter();
- Object args = executor.getArguments();
- MemberMappedArgument memberMappedArg = executor.getMemberMappedArgument();
- getMessage().clear();
- addBytes(hasResult);
- getMessage().addStringPart(regionName, true);
- if (executor.isFnSerializationReqd()) {
- getMessage().addStringOrObjPart(function);
- } else {
- getMessage().addStringOrObjPart(functionId);
- }
- getMessage().addObjPart(args);
- getMessage().addObjPart(memberMappedArg);
byte flags = ExecuteFunctionHelper.createFlags(executeOnBucketSet, isReExecute);
- getMessage().addBytesPart(new byte[] {flags});
- getMessage().addIntPart(routingObjects.size());
- for (Object key : routingObjects) {
- getMessage().addStringOrObjPart(key);
- }
- getMessage().addIntPart(removedNodes.size());
- for (Object nodes : removedNodes) {
- getMessage().addStringOrObjPart(nodes);
- }
+ fillMessage(regionName, function, functionId,
+ executor, removedNodes, hasResult, flags);
}
private void addBytes(byte functionStateOrHasResult) {
@@ -524,6 +396,7 @@ public class ExecuteRegionFunctionOp {
.getCause() instanceof InternalFunctionInvocationTargetException) {
InternalFunctionInvocationTargetException ifite =
(InternalFunctionInvocationTargetException) ex.getCause();
+ failedNodes = ensureMutability(failedNodes);
failedNodes.addAll(ifite.getFailedNodeSet());
addFunctionException((FunctionException) result);
} else {
@@ -555,6 +428,7 @@ public class ExecuteRegionFunctionOp {
if (resultResponse instanceof ArrayList) {
DistributedMember memberID =
(DistributedMember) ((ArrayList) resultResponse).get(1);
+ failedNodes = ensureMutability(failedNodes);
failedNodes.add(memberID.getId());
}
functionException = new FunctionException(fite);
@@ -611,6 +485,7 @@ public class ExecuteRegionFunctionOp {
.getCause() instanceof InternalFunctionInvocationTargetException) {
InternalFunctionInvocationTargetException ifite =
(InternalFunctionInvocationTargetException) ex.getCause();
+ failedNodes = ensureMutability(failedNodes);
failedNodes.addAll(ifite.getFailedNodeSet());
}
throw ex;
diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java
new file mode 100644
index 0000000..72b5748
--- /dev/null
+++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/ExecuteRegionFunctionOpImplTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.client.internal.ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.internal.cache.execute.ServerRegionFunctionExecutor;
+import org.apache.geode.test.junit.categories.ClientServerTest;
+
+
+/**
+ * Test ExecutionRegionFunctionOpImpl class
+ */
+@Category({ClientServerTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class ExecuteRegionFunctionOpImplTest {
+
+ @Test
+ public void testExecuteRegionFunctionOpImplWithFunction() {
+ ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters();
+
+ int numberOfParts = 10;
+ assertEquals(numberOfParts, op.getMessage().getNumberOfParts());
+ for (int i = 0; i < numberOfParts; i++) {
+ assertNotNull(op.getMessage().getPart(i));
+ }
+ assertNull(op.getMessage().getPart(numberOfParts));
+ }
+
+ @Test
+ public void testExecuteRegionFunctionOpImplWithFunctionIdCalculateFnState() {
+ ExecuteRegionFunctionOpImpl op = createOpWithFunctionIdOneFilter();
+
+ int numberOfParts = 9;
+ assertEquals(numberOfParts, op.getMessage().getNumberOfParts());
+ for (int i = 0; i < numberOfParts; i++) {
+ assertNotNull(op.getMessage().getPart(i));
+ }
+ assertNull(op.getMessage().getPart(numberOfParts));
+ }
+
+ @Test
+ public void testExecuteRegionFunctionOpImplWithOpAndIsReexecute() {
+ ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op = createOpWithFunctionTwoFilters();
+
+ HashSet<String> removedNodes = new HashSet(Arrays.asList("node1", "node2", "node3"));
+
+ ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl newOp =
+ new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(op, (byte) 1, removedNodes);
+
+ int numberOfParts = 13;
+ assertEquals(numberOfParts, newOp.getMessage().getNumberOfParts());
+ for (int i = 0; i < numberOfParts; i++) {
+ assertNotNull(newOp.getMessage().getPart(i));
+ }
+ assertNull(newOp.getMessage().getPart(numberOfParts));
+ }
+
+ private ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl createOpWithFunctionTwoFilters() {
+ String region = "testRegion";
+ String functionId = "testFunctionId";
+ Function function = mock(Function.class);
+ ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class);
+ Set filter = new HashSet(Arrays.asList("one", "two"));
+ when(serverRegionExecutor.getFilter()).thenReturn(filter);
+ byte functionState = (byte) 1;
+ byte flags = (byte) 2;
+ ResultCollector resultCollector = mock(ResultCollector.class);
+ int timeoutMs = 100;
+
+ ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl op =
+ new ExecuteRegionFunctionOp.ExecuteRegionFunctionOpImpl(region, function,
+ serverRegionExecutor, resultCollector,
+ timeoutMs);
+ return op;
+ }
+
+ private ExecuteRegionFunctionOpImpl createOpWithFunctionIdOneFilter() {
+ String region = "testRegion";
+ String functionId = "testFunctionId";
+ Function function = null;
+ ServerRegionFunctionExecutor serverRegionExecutor = mock(ServerRegionFunctionExecutor.class);
+ Set filter = new HashSet(Arrays.asList("one"));
+ when(serverRegionExecutor.getFilter()).thenReturn(filter);
+ byte functionState = (byte) 1;
+ byte flags = (byte) 1;
+ byte hasResult = (byte) 1;
+ boolean isHA = false;
+ ResultCollector resultCollector = mock(ResultCollector.class);
+ int timeoutMs = 100;
+ boolean optimizeForWrite = true;
+ boolean isReexecute = false;
+
+ ExecuteRegionFunctionOpImpl op = new ExecuteRegionFunctionOpImpl(region, functionId,
+ serverRegionExecutor, resultCollector, hasResult, isHA, optimizeForWrite,
+ true, timeoutMs);
+ return op;
+ }
+
+}