You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by je...@apache.org on 2019/06/18 15:38:20 UTC
[geode] 02/02: Revert "GEODE-6588: Properly type Function execution
related interfaces. (#3691)"
This is an automated email from the ASF dual-hosted git repository.
jensdeppe pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
commit a5aba9165d2fab083c7c2543f38beb61c82e6c8c
Author: Jens Deppe <jd...@pivotal.io>
AuthorDate: Tue Jun 18 08:30:15 2019 -0700
Revert "GEODE-6588: Properly type Function execution related interfaces. (#3691)"
This reverts commit 6c62540edd5637d5e2bd3a51b11279a2ba825c33.
---
.../ClientFunctionTimeoutRegressionTest.java | 10 +--
.../execute/PRFunctionExecutionDUnitTest.java | 75 +++++++++---------
.../FunctionExecutionOnLonerRegressionTest.java | 14 +---
.../org/apache/geode/cache/execute/Execution.java | 21 +++---
.../geode/cache/execute/FunctionService.java | 27 +++----
.../internal/deadlock/GemFireDeadlockDetector.java | 17 ++---
.../internal/cache/execute/AbstractExecution.java | 55 ++++++++------
.../cache/execute/DefaultResultCollector.java | 9 +--
.../execute/DistributedRegionFunctionExecutor.java | 30 +++++---
.../cache/execute/FunctionExecutionService.java | 25 +++---
.../internal/cache/execute/InternalExecution.java | 8 +-
.../execute/InternalFunctionExecutionService.java | 29 +++----
.../InternalFunctionExecutionServiceImpl.java | 88 ++++++++--------------
.../cache/execute/LocalResultCollectorImpl.java | 31 ++++----
.../cache/execute/MemberFunctionExecutor.java | 39 +++++-----
.../cache/execute/MultiRegionFunctionExecutor.java | 59 ++++++++++-----
.../execute/PartitionedRegionFunctionExecutor.java | 41 +++++++---
.../cache/execute/ServerFunctionExecutor.java | 43 ++++-------
.../execute/ServerRegionFunctionExecutor.java | 59 +++++++--------
.../internal/cache/snapshot/WindowedExporter.java | 15 ++--
.../mutators/MemberConfigManager.java | 8 +-
.../realizers/RegionConfigRealizer.java | 46 ++++++-----
22 files changed, 360 insertions(+), 389 deletions(-)
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java
index 2fd7a6e..93c88bb 100644
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/ClientFunctionTimeoutRegressionTest.java
@@ -183,14 +183,14 @@ public class ClientFunctionTimeoutRegressionTest implements Serializable {
Function<Integer> function = new CheckClientReadTimeout();
FunctionService.registerFunction(function);
- Execution<Integer, Boolean, List<Boolean>> execution;
+ Execution<Integer, Boolean, List<Boolean>> execution = null;
if (functionServiceTarget == ExecutionTarget.REGION) {
- execution = FunctionService.onRegion(clientCache.getRegion(regionName));
+ execution =
+ FunctionService.onRegion(clientCache.getRegion(regionName)).setArguments(timeout);
} else {
- execution = FunctionService.onServer(clientCache.getDefaultPool());
+ execution = FunctionService.onServer(clientCache.getDefaultPool()).setArguments(timeout);
}
- execution = execution.setArguments(timeout);
ResultCollector<Boolean, List<Boolean>> resultCollector = execution.execute(function);
@@ -212,7 +212,7 @@ public class ClientFunctionTimeoutRegressionTest implements Serializable {
*/
private static class CheckClientReadTimeout implements Function<Integer> {
- CheckClientReadTimeout() {
+ public CheckClientReadTimeout() {
// nothing
}
diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java
index 55c14ed..273df33 100755
--- a/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java
+++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/cache/execute/PRFunctionExecutionDUnitTest.java
@@ -121,7 +121,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test to validate that the function execution is successful on PR with Loner Distributed System
*/
@Test
- public void testFunctionExecution() {
+ public void testFunctionExecution() throws Exception {
Properties config = getDistributedSystemProperties();
config.setProperty(MCAST_PORT, "0");
config.setProperty(LOCATORS, "");
@@ -141,7 +141,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
@Test
- public void testHAFunctionExecution() {
+ public void testHAFunctionExecution() throws Exception {
Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0);
Function<Void> function = new TestFunction<>(false, TestFunction.TEST_FUNCTION10);
@@ -160,7 +160,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test remote execution by a pure accessor which doesn't have the function factory present.
*/
@Test
- public void testRemoteSingleKeyExecution_byName() {
+ public void testRemoteSingleKeyExecution_byName() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
@@ -208,7 +208,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* function will send Boolean as last result. factory present.
*/
@Test
- public void testLocalSingleKeyExecution_byName_FunctionInvocationTargetException() {
+ public void testLocalSingleKeyExecution_byName_FunctionInvocationTargetException()
+ throws Exception {
Region<String, Integer> region = createPartitionedRegion(regionName, 10, 0);
region.put(STRING_KEY, 1);
@@ -230,7 +231,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* last result.
*/
@Test
- public void testRemoteSingleKeyExecution_byName_FunctionInvocationTargetException() {
+ public void testRemoteSingleKeyExecution_byName_FunctionInvocationTargetException()
+ throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
@@ -264,7 +266,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test remote execution by a pure accessor which doesn't have the function factory present.
*/
@Test
- public void testRemoteSingleKeyExecution_byInstance() {
+ public void testRemoteSingleKeyExecution_byInstance() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
@@ -312,7 +314,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test remote execution of inline function by a pure accessor
*/
@Test
- public void testRemoteSingleKeyExecution_byInlineFunction() {
+ public void testRemoteSingleKeyExecution_byInlineFunction() throws Exception {
VM accessor = getHost(0).getVM(2);
VM datastore = getHost(0).getVM(3);
@@ -341,7 +343,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* present. ResultCollector = DefaultResultCollector haveResults = true;
*/
@Test
- public void testRemoteMultiKeyExecution_byName() {
+ public void testRemoteMultiKeyExecution_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -413,7 +415,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
@Test
- public void testRemoteMultiKeyExecution_BucketMoved() {
+ public void testRemoteMultiKeyExecution_BucketMoved() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -461,7 +463,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
@Test
- public void testLocalMultiKeyExecution_BucketMoved() {
+ public void testLocalMultiKeyExecution_BucketMoved() throws Exception {
IgnoredException.addIgnoredException("BucketMovedException");
VM datastore0 = getHost(0).getVM(0);
@@ -599,7 +601,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
});
- AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(this::executeFunction);
+ AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction());
datastore0.invoke(() -> {
Thread.sleep(3_000);
@@ -651,7 +653,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
});
- AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(this::executeFunction);
+ AsyncInvocation<List<Boolean>> async = accessor.invokeAsync(() -> executeFunction());
datastore0.invoke(() -> {
Thread.sleep(3000);
@@ -667,7 +669,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* DefaultResultCollector haveResults = true;
*/
@Test
- public void testRemoteMultiKeyExecution_byInlineFunction() {
+ public void testRemoteMultiKeyExecution_byInlineFunction() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -718,7 +720,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* present. ResultCollector = CustomResultCollector haveResults = true;
*/
@Test
- public void testRemoteMultiKeyExecutionWithCollector_byName() {
+ public void testRemoteMultiKeyExecutionWithCollector_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -775,7 +777,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* present. ResultCollector = DefaultResultCollector haveResults = false;
*/
@Test
- public void testRemoteMultiKeyExecutionNoResult_byName() {
+ public void testRemoteMultiKeyExecutionNoResult_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -820,7 +822,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
ResultCollector<Void, Void> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
- assertThatThrownBy(resultCollector::getResult).isInstanceOf(FunctionException.class)
+ assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class)
.hasMessageStartingWith(
String.format("Cannot %s result as the Function#hasResult() is false",
"return any"));
@@ -833,7 +835,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* milliseconds expected result to be 0.(as the execution gets the timeout)
*/
@Test
- public void testRemoteMultiKeyExecution_timeout() {
+ public void testRemoteMultiKeyExecution_timeout() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -889,7 +891,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* present. ResultCollector = CustomResultCollector haveResults = false;
*/
@Test
- public void testRemoteMultiKeyExecutionWithCollectorNoResult_byName() {
+ public void testRemoteMultiKeyExecutionWithCollectorNoResult_byName() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -936,7 +938,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
ResultCollector<Object, List<Object>> resultCollector =
dataSet.withFilter(keySet).setArguments(true).execute(function.getId());
- assertThatThrownBy(resultCollector::getResult).isInstanceOf(FunctionException.class)
+ assertThatThrownBy(() -> resultCollector.getResult()).isInstanceOf(FunctionException.class)
.hasMessageStartingWith(
String.format("Cannot %s result as the Function#hasResult() is false",
"return any"));
@@ -948,7 +950,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* present.
*/
@Test
- public void testRemoteMultiKeyExecution_byInstance() {
+ public void testRemoteMultiKeyExecution_byInstance() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -1022,7 +1024,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test bucketFilter functionality
*/
@Test
- public void testBucketFilter_1() {
+ public void testBucketFilter_1() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -1104,7 +1106,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
@Test
- public void testBucketFilterOverride() {
+ public void testBucketFilterOverride() throws Exception {
VM accessor = getHost(0).getVM(3);
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
@@ -1160,7 +1162,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* DefaultResultCollector haveResult = true
*/
@Test
- public void testLocalMultiKeyExecution_byName() {
+ public void testLocalMultiKeyExecution_byName() throws Exception {
PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0);
Set<String> keySet = new HashSet<>();
@@ -1208,7 +1210,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Test ability to execute a multi-key function by a local data store
*/
@Test
- public void testLocalMultiKeyExecution_byInstance() {
+ public void testLocalMultiKeyExecution_byInstance() throws Exception {
PartitionedRegion pr = createPartitionedRegion(regionName, 10, 0);
Set<String> keySet = new HashSet<>();
@@ -1257,7 +1259,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* works correctly such that there is not extra execution
*/
@Test
- public void testMultiKeyExecutionOnASingleBucket_byName() {
+ public void testMultiKeyExecutionOnASingleBucket_byName() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1336,7 +1338,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* works correctly such that there is not extra execution
*/
@Test
- public void testMultiKeyExecutionOnASingleBucket_byInstance() {
+ public void testMultiKeyExecutionOnASingleBucket_byInstance() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1414,7 +1416,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Ensure that the execution is happening all the PR as a whole
*/
@Test
- public void testExecutionOnAllNodes_byName() {
+ public void testExecutionOnAllNodes_byName() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1475,7 +1477,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Ensure that the execution is happening all the PR as a whole
*/
@Test
- public void testExecutionOnAllNodes_byInstance() {
+ public void testExecutionOnAllNodes_byInstance() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1535,7 +1537,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Ensure that the execution of inline function is happening all the PR as a whole
*/
@Test
- public void testExecutionOnAllNodes_byInlineFunction() {
+ public void testExecutionOnAllNodes_byInlineFunction() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1590,7 +1592,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* LocalDataSet
*/
@Test
- public void testExecutionOnAllNodes_LocalReadPR() {
+ public void testExecutionOnAllNodes_LocalReadPR() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1655,7 +1657,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* LocalDataSet
*/
@Test
- public void testExecutionOnMultiNodes_LocalReadPR() {
+ public void testExecutionOnMultiNodes_LocalReadPR() throws Exception {
VM datastore0 = getHost(0).getVM(0);
VM datastore1 = getHost(0).getVM(1);
VM datastore2 = getHost(0).getVM(2);
@@ -1758,7 +1760,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Assert the {@link RegionFunctionContext} yields the proper objects.
*/
@Test
- public void testLocalDataContext() {
+ public void testLocalDataContext() throws Exception {
VM accessor = getHost(0).getVM(1);
VM datastore1 = getHost(0).getVM(2);
VM datastore2 = getHost(0).getVM(3);
@@ -1841,7 +1843,7 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
* Assert the {@link RegionFunctionContext} yields the proper objects.
*/
@Test
- public void testLocalDataContextWithColocation() {
+ public void testLocalDataContextWithColocation() throws Exception {
VM accessor = getHost(0).getVM(1);
VM datastore1 = getHost(0).getVM(2);
VM datastore2 = getHost(0).getVM(3);
@@ -1971,9 +1973,8 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
}
};
- Execution<Boolean, Boolean, List<Boolean>> execution = FunctionService.onRegion(rootRegion);
ResultCollector<Boolean, List<Boolean>> resultCollector =
- execution.withFilter(createKeySet(key1)).execute(function);
+ FunctionService.onRegion(rootRegion).withFilter(createKeySet(key1)).execute(function);
assertThat(resultCollector.getResult()).hasSize(1).containsExactly(true);
}
@@ -2125,7 +2126,9 @@ public class PRFunctionExecutionDUnitTest extends CacheTestCase {
private Set<String> createKeySet(final String... keys) {
Set<String> keySet = new HashSet<>();
- Collections.addAll(keySet, keys);
+ for (String key : keys) {
+ keySet.add(key);
+ }
return keySet;
}
diff --git a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
index bfd76cf..fc6efc9 100644
--- a/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
+++ b/geode-core/src/integrationTest/java/org/apache/geode/internal/cache/execute/FunctionExecutionOnLonerRegressionTest.java
@@ -26,7 +26,6 @@ import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
-import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -65,7 +64,6 @@ public class FunctionExecutionOnLonerRegressionTest {
private Region<String, String> region;
private Set<String> keysForGet;
private Set<String> expectedValues;
- private Cache cache;
@Before
public void setUp() {
@@ -78,7 +76,7 @@ public class FunctionExecutionOnLonerRegressionTest {
DistributionManager dm = ds.getDistributionManager();
assertThat(dm).isInstanceOf(LonerDistributionManager.class);
- cache = CacheFactory.create(ds);
+ Cache cache = CacheFactory.create(ds);
RegionFactory<String, String> regionFactory = cache.createRegionFactory(PARTITION);
region = regionFactory.create("region");
@@ -96,11 +94,6 @@ public class FunctionExecutionOnLonerRegressionTest {
}
}
- @After
- public void tearDown() throws Exception {
- cache.close();
- }
-
private Properties getDistributedSystemProperties() {
Properties config = new Properties();
config.setProperty(MCAST_PORT, "0");
@@ -111,10 +104,9 @@ public class FunctionExecutionOnLonerRegressionTest {
}
@Test
- public void executeFunctionOnLonerShouldNotThrowClassCastException() {
+ public void executeFunctionOnLonerShouldNotThrowClassCastException() throws Exception {
Execution<Void, Collection<String>, Collection<String>> execution =
- FunctionService.onRegion(region);
- execution = execution.withFilter(keysForGet);
+ FunctionService.onRegion(region).withFilter(keysForGet);
ResultCollector<Collection<String>, Collection<String>> resultCollector =
execution.execute(new TestFunction());
assertThat(resultCollector.getResult())
diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java b/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java
index 5f89c1d..cec0bbc 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/execute/Execution.java
@@ -26,16 +26,16 @@ import org.apache.geode.cache.LowMemoryException;
* This interface is implemented by GemFire. To obtain an instance of it use
* {@link FunctionService}.
*
- * @param <ArgumentT> The type of the argument passed into the function, if any
- * @param <ReturnT> The type of results sent by the function
- * @param <AggregatorT> The type of the aggregated result returned by the ResultCollector
+ * @param <IN> The type of the argument passed into the function, if any
+ * @param <OUT> The type of results sent by the function
+ * @param <AGG> The type of the aggregated result returned by the ResultCollector
*
* @since GemFire 6.0
*
* @see FunctionService
* @see Function
*/
-public interface Execution<ArgumentT, ReturnT, AggregatorT> {
+public interface Execution<IN, OUT, AGG> {
/**
* Specifies a data filter of routing objects for selecting the GemFire members to execute the
@@ -51,7 +51,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
* {@link FunctionService#onRegion(org.apache.geode.cache.Region)}
* @since GemFire 6.0
*/
- Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set<?> filter);
+ Execution<IN, OUT, AGG> withFilter(Set<?> filter);
/**
* Specifies the user data passed to the function when it is executed. The function can retrieve
@@ -63,7 +63,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
* @since Geode 1.2
*
*/
- Execution<ArgumentT, ReturnT, AggregatorT> setArguments(ArgumentT args);
+ Execution<IN, OUT, AGG> setArguments(IN args);
/**
* Specifies the user data passed to the function when it is executed. The function can retrieve
@@ -76,7 +76,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
* @deprecated use {@link #setArguments(Object)} instead
*
*/
- Execution<ArgumentT, ReturnT, AggregatorT> withArgs(ArgumentT args);
+ Execution<IN, OUT, AGG> withArgs(IN args);
/**
* Specifies the {@link ResultCollector} that will receive the results after the function has been
@@ -88,8 +88,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
* @see ResultCollector
* @since GemFire 6.0
*/
- Execution<ArgumentT, ReturnT, AggregatorT> withCollector(
- ResultCollector<ReturnT, AggregatorT> rc);
+ Execution<IN, OUT, AGG> withCollector(ResultCollector<OUT, AGG> rc);
/**
* Executes the function using its {@linkplain Function#getId() id}
@@ -105,7 +104,7 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
*
* @since GemFire 6.0
*/
- ResultCollector<ReturnT, AggregatorT> execute(String functionId) throws FunctionException;
+ ResultCollector<OUT, AGG> execute(String functionId) throws FunctionException;
/**
* Executes the function instance provided.
@@ -122,5 +121,5 @@ public interface Execution<ArgumentT, ReturnT, AggregatorT> {
*
* @since GemFire 6.0
*/
- ResultCollector<ReturnT, AggregatorT> execute(Function function) throws FunctionException;
+ ResultCollector<OUT, AGG> execute(Function function) throws FunctionException;
}
diff --git a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java
index 9c5262f..9757caa 100755
--- a/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/execute/FunctionService.java
@@ -72,8 +72,7 @@ public class FunctionService {
* @throws FunctionException if the region passed in is null
* @since GemFire 6.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion(
- Region region) {
+ public static Execution onRegion(Region region) {
return getFunctionExecutionService().onRegion(region);
}
@@ -88,8 +87,7 @@ public class FunctionService {
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- Pool pool) {
+ public static Execution onServer(Pool pool) {
return getFunctionExecutionService().onServer(pool);
}
@@ -102,8 +100,7 @@ public class FunctionService {
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- Pool pool) {
+ public static Execution onServers(Pool pool) {
return getFunctionExecutionService().onServers(pool);
}
@@ -120,8 +117,7 @@ public class FunctionService {
* pool
* @since GemFire 6.5
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- RegionService regionService) {
+ public static Execution onServer(RegionService regionService) {
return getFunctionExecutionService().onServer(regionService);
}
@@ -136,8 +132,7 @@ public class FunctionService {
* pool
* @since GemFire 6.5
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- RegionService regionService) {
+ public static Execution onServers(RegionService regionService) {
return getFunctionExecutionService().onServers(regionService);
}
@@ -151,8 +146,7 @@ public class FunctionService {
* @throws FunctionException if distributedMember is null
* @since GemFire 7.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedMember distributedMember) {
+ public static Execution onMember(DistributedMember distributedMember) {
return getFunctionExecutionService().onMember(distributedMember);
}
@@ -171,8 +165,7 @@ public class FunctionService {
* @throws FunctionException if no members are found belonging to the provided groups
* @since GemFire 7.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- String... groups) {
+ public static Execution onMembers(String... groups) {
return getFunctionExecutionService().onMembers(groups);
}
@@ -185,8 +178,7 @@ public class FunctionService {
* @throws FunctionException if distributedMembers is null
* @since GemFire 7.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- Set<DistributedMember> distributedMembers) {
+ public static Execution onMembers(Set<DistributedMember> distributedMembers) {
return getFunctionExecutionService().onMembers(distributedMembers);
}
@@ -201,8 +193,7 @@ public class FunctionService {
* @throws FunctionException if no members are found belonging to the provided groups
* @since GemFire 7.0
*/
- public static <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- String... groups) {
+ public static Execution onMember(String... groups) {
return getFunctionExecutionService().onMember(groups);
}
diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java
index 007c028..1fa4d63 100644
--- a/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java
+++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/deadlock/GemFireDeadlockDetector.java
@@ -67,7 +67,7 @@ public class GemFireDeadlockDetector {
@Override
public synchronized Serializable getResult(long timeout, TimeUnit unit)
- throws FunctionException {
+ throws FunctionException, InterruptedException {
return null;
}
@@ -91,18 +91,15 @@ public class GemFireDeadlockDetector {
};
- Execution<DistributedMember, HashSet<Dependency>, Serializable> onMembersExecution;
- Execution<DistributedMember, HashSet<Dependency>, Serializable> withCollectorExecution;
+ Execution execution;
if (targetMembers != null) {
- onMembersExecution = FunctionService.onMembers(targetMembers);
- withCollectorExecution = onMembersExecution.withCollector(collector);
+ execution = FunctionService.onMembers(targetMembers).withCollector(collector);
} else {
- onMembersExecution = FunctionService.onMembers();
- withCollectorExecution = onMembersExecution.withCollector(collector);
+ execution = FunctionService.onMembers().withCollector(collector);
}
- ((AbstractExecution) withCollectorExecution).setIgnoreDepartedMembers(true);
- collector = withCollectorExecution.execute(new CollectDependencyFunction());
+ ((AbstractExecution) execution).setIgnoreDepartedMembers(true);
+ collector = execution.execute(new CollectDependencyFunction());
// Wait for results
collector.getResult();
@@ -130,7 +127,7 @@ public class GemFireDeadlockDetector {
InternalDistributedMember member = instance.getDistributedMember();
Set<Dependency> dependencies = DeadlockDetector.collectAllDependencies(member);
- context.getResultSender().lastResult(dependencies);
+ context.getResultSender().lastResult((Serializable) dependencies);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
index d015be6..c2378eb 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/AbstractExecution.java
@@ -49,8 +49,7 @@ import org.apache.geode.internal.logging.LogService;
* @since GemFire 5.8LA
*
*/
-public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
- implements InternalExecution<ArgumentT, ReturnT, AggregatorT> {
+public abstract class AbstractExecution implements InternalExecution {
private static final Logger logger = LogService.getLogger();
@@ -70,7 +69,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
protected volatile boolean isClientServerMode = false;
- protected Set<String> failedNodes = new HashSet<>();
+ protected Set<String> failedNodes = new HashSet<String>();
protected boolean isFnSerializationReqd;
@@ -99,7 +98,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
@MakeNotStatic
private static final ConcurrentHashMap<String, byte[]> idToFunctionAttributes =
- new ConcurrentHashMap<>();
+ new ConcurrentHashMap<String, byte[]>();
public static final byte NO_HA_NO_HASRESULT_NO_OPTIMIZEFORWRITE = 0;
@@ -206,7 +205,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
return this.filter;
}
- public AbstractExecution<ArgumentT, ReturnT, AggregatorT> setIsReExecute() {
+ public AbstractExecution setIsReExecute() {
this.isReExecute = true;
if (this.executionNodesListener != null) {
this.executionNodesListener.reset();
@@ -259,7 +258,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
public void executeFunctionOnLocalPRNode(final Function fn, final FunctionContext cx,
final PartitionedRegionFunctionResultSender sender, DistributionManager dm, boolean isTx) {
if (dm instanceof ClusterDistributionManager && !isTx) {
- if (ServerConnection.isExecuteFunctionOnLocalNodeOnly() == 1) {
+ if (ServerConnection.isExecuteFunctionOnLocalNodeOnly().byteValue() == 1) {
ServerConnection.executeFunctionOnLocalNodeOnly((byte) 3);// executed locally
executeFunctionLocally(fn, cx, sender, dm);
if (!sender.isLastResultReceived() && fn.hasResult()) {
@@ -270,12 +269,15 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
} else {
final ClusterDistributionManager newDM = (ClusterDistributionManager) dm;
- newDM.getFunctionExecutor().execute(() -> {
- executeFunctionLocally(fn, cx, sender, newDM);
- if (!sender.isLastResultReceived() && fn.hasResult()) {
- ((InternalResultSender) sender).setException(new FunctionException(
- String.format("The function, %s, did not send last result",
- fn.getId())));
+ newDM.getFunctionExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ executeFunctionLocally(fn, cx, sender, newDM);
+ if (!sender.isLastResultReceived() && fn.hasResult()) {
+ ((InternalResultSender) sender).setException(new FunctionException(
+ String.format("The function, %s, did not send last result",
+ fn.getId())));
+ }
}
});
}
@@ -296,12 +298,15 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
final ResultSender sender, DistributionManager dm, final boolean isTx) {
if (dm instanceof ClusterDistributionManager && !isTx) {
final ClusterDistributionManager newDM = (ClusterDistributionManager) dm;
- newDM.getFunctionExecutor().execute(() -> {
- executeFunctionLocally(fn, cx, sender, newDM);
- if (!((InternalResultSender) sender).isLastResultReceived() && fn.hasResult()) {
- ((InternalResultSender) sender).setException(new FunctionException(
- String.format("The function, %s, did not send last result",
- fn.getId())));
+ newDM.getFunctionExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ executeFunctionLocally(fn, cx, sender, newDM);
+ if (!((InternalResultSender) sender).isLastResultReceived() && fn.hasResult()) {
+ ((InternalResultSender) sender).setException(new FunctionException(
+ String.format("The function, %s, did not send last result",
+ fn.getId())));
+ }
}
});
} else {
@@ -330,7 +335,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
fn.execute(cx);
stats.endFunctionExecution(start, fn.hasResult());
} catch (FunctionInvocationTargetException fite) {
- FunctionException functionException;
+ FunctionException functionException = null;
if (fn.isHA()) {
functionException =
new FunctionException(new InternalFunctionInvocationTargetException(fite.getMessage()));
@@ -339,7 +344,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
}
handleException(functionException, fn, cx, sender, dm);
} catch (BucketMovedException bme) {
- FunctionException functionException;
+ FunctionException functionException = null;
if (fn.isHA()) {
functionException =
new FunctionException(new InternalFunctionInvocationTargetException(bme));
@@ -357,7 +362,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) {
+ public ResultCollector execute(final String functionName) {
if (functionName == null) {
throw new FunctionException(
"The input function for the execute function request is null");
@@ -374,7 +379,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public ResultCollector<ReturnT, AggregatorT> execute(Function function) throws FunctionException {
+ public ResultCollector execute(Function function) throws FunctionException {
if (function == null) {
throw new FunctionException(
"The input function for the execute function request is null");
@@ -424,7 +429,7 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
return this.ignoreDepartedMembers;
}
- protected abstract ResultCollector<ReturnT, AggregatorT> executeFunction(Function fn);
+ protected abstract ResultCollector executeFunction(Function fn);
/**
* validates whether a function should execute in presence of transaction and HeapCritical
@@ -437,8 +442,8 @@ public abstract class AbstractExecution<ArgumentT, ReturnT, AggregatorT>
*/
public abstract void validateExecution(Function function, Set targetMembers);
- public LocalResultCollector<ReturnT, AggregatorT> getLocalResultCollector(Function function,
- final ResultCollector<ReturnT, AggregatorT> rc) {
+ public LocalResultCollector<?, ?> getLocalResultCollector(Function function,
+ final ResultCollector<?, ?> rc) {
if (rc instanceof LocalResultCollector) {
return (LocalResultCollector) rc;
} else {
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java
index ea5b8b3..2d8e167 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DefaultResultCollector.java
@@ -15,7 +15,6 @@
package org.apache.geode.internal.cache.execute;
import java.util.ArrayList;
-import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.geode.cache.execute.Function;
@@ -33,9 +32,9 @@ import org.apache.geode.distributed.DistributedMember;
* @since GemFire 6.0
*
*/
-public class DefaultResultCollector implements ResultCollector<Object, List<Object>> {
+public class DefaultResultCollector implements ResultCollector {
- private List<Object> resultList = new ArrayList<>();
+ private ArrayList<Object> resultList = new ArrayList<Object>();
public DefaultResultCollector() {}
@@ -58,7 +57,7 @@ public class DefaultResultCollector implements ResultCollector<Object, List<Obje
* @throws FunctionException if something goes wrong while retrieving the result
*/
@Override
- public List<Object> getResult() throws FunctionException {
+ public Object getResult() throws FunctionException {
return this.resultList; // this is full result
}
@@ -82,7 +81,7 @@ public class DefaultResultCollector implements ResultCollector<Object, List<Obje
* @throws FunctionException if something goes wrong while retrieving the result
*/
@Override
- public List<Object> getResult(long timeout, TimeUnit unit) throws FunctionException {
+ public Object getResult(long timeout, TimeUnit unit) throws FunctionException {
return this.resultList;
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
index a745d76..18e6f84 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/DistributedRegionFunctionExecutor.java
@@ -45,8 +45,7 @@ import org.apache.geode.internal.cache.control.MemoryThresholds;
* @since GemFire 5.8 LA
*
*/
-public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> {
+public class DistributedRegionFunctionExecutor extends AbstractExecution {
private final LocalRegion region;
@@ -61,6 +60,16 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
this.region = (LocalRegion) r;
}
+ private DistributedRegionFunctionExecutor(DistributedRegionFunctionExecutor drfe) {
+ super(drfe);
+ this.region = drfe.region;
+ if (drfe.filter != null) {
+ this.filter.clear();
+ this.filter.addAll(drfe.filter);
+ }
+ this.sender = drfe.sender;
+ }
+
public DistributedRegionFunctionExecutor(DistributedRegion region, Set filter2, Object args,
MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) {
if (args != null) {
@@ -212,8 +221,7 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(
- Set<Integer> bucketIDs) {
+ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if (bucketIDs != null && !bucketIDs.isEmpty()) {
throw new IllegalArgumentException(
String.format("Buckets as filter cannot be applied to a non partitioned region: %s",
@@ -274,12 +282,14 @@ public class DistributedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
@Override
public String toString() {
- return "[ DistributedRegionFunctionExecutor:"
- + "args="
- + this.args
- + ";region="
- + this.region.getName()
- + "]";
+ final StringBuffer buf = new StringBuffer();
+ buf.append("[ DistributedRegionFunctionExecutor:");
+ buf.append("args=");
+ buf.append(this.args);
+ buf.append(";region=");
+ buf.append(this.region.getName());
+ buf.append("]");
+ return buf.toString();
}
/*
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java
index bcc481f..6916728 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/FunctionExecutionService.java
@@ -50,8 +50,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if the region passed in is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion(
- Region region);
+ Execution onRegion(Region region);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -64,7 +63,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(Pool pool);
+ Execution onServer(Pool pool);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -75,7 +74,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(Pool pool);
+ Execution onServers(Pool pool);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -90,8 +89,7 @@ public interface FunctionExecutionService {
* pool
* @since GemFire 6.5
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- RegionService regionService);
+ Execution onServer(RegionService regionService);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -104,8 +102,7 @@ public interface FunctionExecutionService {
* pool
* @since GemFire 6.5
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- RegionService regionService);
+ Execution onServers(RegionService regionService);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -117,8 +114,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if distributedMember is null
* @since GemFire 7.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedMember distributedMember);
+ Execution onMember(DistributedMember distributedMember);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -135,8 +131,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if no members are found belonging to the provided groups
* @since GemFire 7.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- String... groups);
+ Execution onMembers(String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -147,8 +142,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if distributedMembers is null
* @since GemFire 7.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- Set<DistributedMember> distributedMembers);
+ Execution onMembers(Set<DistributedMember> distributedMembers);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -161,8 +155,7 @@ public interface FunctionExecutionService {
* @throws FunctionException if no members are found belonging to the provided groups
* @since GemFire 7.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- String... groups);
+ Execution onMember(String... groups);
/**
* Returns the {@link Function} defined by the functionId, returns null if no function is found
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java
index 635cd19..ecb088f 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalExecution.java
@@ -28,11 +28,9 @@ import org.apache.geode.cache.execute.ResultCollector;
* @since GemFire 5.8LA
*
*/
-public interface InternalExecution<ArgumentT, ReturnT, AggregatorT>
- extends Execution<ArgumentT, ReturnT, AggregatorT> {
+public interface InternalExecution extends Execution {
- InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument(
- MemberMappedArgument argument);
+ InternalExecution withMemberMappedArgument(MemberMappedArgument argument);
/**
* Specifies a filter of bucketIDs for selecting the GemFire members to execute the function on.
@@ -46,7 +44,7 @@ public interface InternalExecution<ArgumentT, ReturnT, AggregatorT>
* {@link FunctionService#onRegion(org.apache.geode.cache.Region)}
* @since Geode 1.0
*/
- InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(Set<Integer> bucketIDs);
+ InternalExecution withBucketFilter(Set<Integer> bucketIDs);
/**
* If true, function execution waits for all exceptions from target nodes <br>
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java
index 24fc6f8..6fb529a 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionService.java
@@ -52,8 +52,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
*
* @see MultiRegionFunctionContext
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegions(
- Set<Region> regions);
+ Execution onRegions(Set<Region> regions);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -66,8 +65,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* pool
* @since GemFire 6.5
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- RegionService regionService, String... groups);
+ Execution onServers(RegionService regionService, String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -82,8 +80,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* pool
* @since GemFire 6.5
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- RegionService regionService, String... groups);
+ Execution onServer(RegionService regionService, String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -94,8 +91,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(Pool pool,
- String... groups);
+ Execution onServers(Pool pool, String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -108,8 +104,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* @throws FunctionException if Pool instance passed in is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(Pool pool,
- String... groups);
+ Execution onServer(Pool pool, String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -123,9 +118,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* @since GemFire 6.0
*
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedSystem system,
- DistributedMember distributedMember);
+ Execution onMember(DistributedSystem system, DistributedMember distributedMember);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -137,8 +130,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* @throws FunctionException if DistributedSystem instance passed is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- DistributedSystem system, String... groups);
+ Execution onMembers(DistributedSystem system, String... groups);
/**
* Uses {@code RANDOM_onMember} for tests.
@@ -146,8 +138,7 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* <p>
* TODO: maybe merge with {@link #onMembers(DistributedSystem, String...)}
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedSystem system, String... groups);
+ Execution onMember(DistributedSystem system, String... groups);
/**
* Returns an {@link Execution} object that can be used to execute a data independent function on
@@ -159,7 +150,5 @@ public interface InternalFunctionExecutionService extends FunctionExecutionServi
* @throws FunctionException if DistributedSystem instance passed is null
* @since GemFire 6.0
*/
- <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- DistributedSystem system,
- Set<DistributedMember> distributedMembers);
+ Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers);
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java
index 954e344..3416c93 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/InternalFunctionExecutionServiceImpl.java
@@ -67,50 +67,42 @@ public class InternalFunctionExecutionServiceImpl
// FunctionExecutionService API ----------------------------------------------------------------
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- Pool pool) {
+ public Execution onServer(Pool pool) {
return onServer(pool, EMPTY_GROUPS);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- Pool pool) {
+ public Execution onServers(Pool pool) {
return onServers(pool, EMPTY_GROUPS);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- RegionService regionService) {
+ public Execution onServer(RegionService regionService) {
return onServer(regionService, EMPTY_GROUPS);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- RegionService regionService) {
+ public Execution onServers(RegionService regionService) {
return onServers(regionService, EMPTY_GROUPS);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedMember distributedMember) {
+ public Execution onMember(DistributedMember distributedMember) {
return onMember(getDistributedSystem(), distributedMember);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- String... groups) {
+ public Execution onMembers(String... groups) {
return onMembers(getDistributedSystem(), groups);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- Set<DistributedMember> distributedMembers) {
+ public Execution onMembers(Set<DistributedMember> distributedMembers) {
return onMembers(getDistributedSystem(), distributedMembers);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- String... groups) {
+ public Execution onMember(String... groups) {
return onMember(getDistributedSystem(), groups);
}
@@ -119,8 +111,7 @@ public class InternalFunctionExecutionServiceImpl
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegion(
- Region region) {
+ public Execution onRegion(Region region) {
if (region == null) {
throw new FunctionException("Region instance passed is null");
}
@@ -146,12 +137,12 @@ public class InternalFunctionExecutionServiceImpl
}
if (isClientRegion(region)) {
- return new ServerRegionFunctionExecutor<>(region, proxyCache);
+ return new ServerRegionFunctionExecutor(region, proxyCache);
}
if (PartitionRegionHelper.isPartitionedRegion(region)) {
- return new PartitionedRegionFunctionExecutor<>(region);
+ return new PartitionedRegionFunctionExecutor(region);
}
- return new DistributedRegionFunctionExecutor<>(region);
+ return new DistributedRegionFunctionExecutor(region);
}
@Override
@@ -214,8 +205,7 @@ public class InternalFunctionExecutionServiceImpl
// InternalFunctionExecutionService OnServerGroups API -----------------------------------------
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- Pool pool, String... groups) {
+ public Execution onServer(Pool pool, String... groups) {
if (pool == null) {
throw new FunctionException(
String.format("%s passed is null", "Pool instance "));
@@ -225,12 +215,11 @@ public class InternalFunctionExecutionServiceImpl
throw new UnsupportedOperationException();
}
- return new ServerFunctionExecutor<>(pool, false, groups);
+ return new ServerFunctionExecutor(pool, false, groups);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- Pool pool, String... groups) {
+ public Execution onServers(Pool pool, String... groups) {
if (pool == null) {
throw new FunctionException(
String.format("%s passed is null", "Pool instance "));
@@ -240,13 +229,11 @@ public class InternalFunctionExecutionServiceImpl
throw new UnsupportedOperationException();
}
- return new ServerFunctionExecutor<>(pool, true, groups);
+ return new ServerFunctionExecutor(pool, true, groups);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServer(
- RegionService regionService,
- String... groups) {
+ public Execution onServer(RegionService regionService, String... groups) {
if (regionService == null) {
throw new FunctionException(String.format("%s passed is null",
"RegionService instance "));
@@ -262,16 +249,13 @@ public class InternalFunctionExecutionServiceImpl
}
} else {
ProxyCache proxyCache = (ProxyCache) regionService;
- return new ServerFunctionExecutor<>(proxyCache.getUserAttributes().getPool(), false,
- proxyCache,
+ return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), false, proxyCache,
groups);
}
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onServers(
- RegionService regionService,
- String... groups) {
+ public Execution onServers(RegionService regionService, String... groups) {
if (regionService == null) {
throw new FunctionException(String.format("%s passed is null",
"RegionService instance "));
@@ -287,8 +271,7 @@ public class InternalFunctionExecutionServiceImpl
}
} else {
ProxyCache proxyCache = (ProxyCache) regionService;
- return new ServerFunctionExecutor<>(proxyCache.getUserAttributes().getPool(), true,
- proxyCache,
+ return new ServerFunctionExecutor(proxyCache.getUserAttributes().getPool(), true, proxyCache,
groups);
}
}
@@ -296,9 +279,7 @@ public class InternalFunctionExecutionServiceImpl
// InternalFunctionExecutionService InDistributedSystem API ------------------------------------
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedSystem system,
- DistributedMember distributedMember) {
+ public Execution onMember(DistributedSystem system, DistributedMember distributedMember) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
@@ -307,19 +288,17 @@ public class InternalFunctionExecutionServiceImpl
throw new FunctionException(String.format("%s passed is null",
"DistributedMember instance "));
}
- return new MemberFunctionExecutor<>(system, distributedMember);
+ return new MemberFunctionExecutor(system, distributedMember);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- DistributedSystem system,
- String... groups) {
+ public Execution onMembers(DistributedSystem system, String... groups) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
}
if (groups.length == 0) {
- return new MemberFunctionExecutor<>(system);
+ return new MemberFunctionExecutor(system);
}
Set<DistributedMember> members = new HashSet<>();
for (String group : groups) {
@@ -329,13 +308,11 @@ public class InternalFunctionExecutionServiceImpl
throw new FunctionException(String.format("No members found in group(s) %s",
Arrays.toString(groups)));
}
- return new MemberFunctionExecutor<>(system, members);
+ return new MemberFunctionExecutor(system, members);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMember(
- DistributedSystem system,
- String... groups) {
+ public Execution onMember(DistributedSystem system, String... groups) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
@@ -356,13 +333,11 @@ public class InternalFunctionExecutionServiceImpl
throw new FunctionException(String.format("No members found in group(s) %s",
Arrays.toString(groups)));
}
- return new MemberFunctionExecutor<>(system, members);
+ return new MemberFunctionExecutor(system, members);
}
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onMembers(
- DistributedSystem system,
- Set<DistributedMember> distributedMembers) {
+ public Execution onMembers(DistributedSystem system, Set<DistributedMember> distributedMembers) {
if (system == null) {
throw new FunctionException(String.format("%s passed is null",
"DistributedSystem instance "));
@@ -371,14 +346,13 @@ public class InternalFunctionExecutionServiceImpl
throw new FunctionException(String.format("%s passed is null",
"distributedMembers set "));
}
- return new MemberFunctionExecutor<>(system, distributedMembers);
+ return new MemberFunctionExecutor(system, distributedMembers);
}
// InternalFunctionExecutionService OnRegions API ----------------------------------------------
@Override
- public <ArgumentT, ReturnT, AggregatorT> Execution<ArgumentT, ReturnT, AggregatorT> onRegions(
- Set<Region> regions) {
+ public Execution onRegions(Set<Region> regions) {
if (regions == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
@@ -398,7 +372,7 @@ public class InternalFunctionExecutionServiceImpl
"FunctionService#onRegions() is not supported for cache clients in client server mode");
}
}
- return new MultiRegionFunctionExecutor<>(regions);
+ return new MultiRegionFunctionExecutor(regions);
}
// InternalFunctionExecutionService unregisterAllFunctions API ---------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java
index 2c804dc..4bb2c6b 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/LocalResultCollectorImpl.java
@@ -25,9 +25,9 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.distributed.DistributedMember;
import org.apache.geode.distributed.internal.ReplyProcessor21;
-public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<OUT, AGG> {
+public class LocalResultCollectorImpl implements LocalResultCollector {
- private final ResultCollector<OUT, AGG> userRC;
+ private final ResultCollector userRC;
private CountDownLatch latch = new CountDownLatch(1);
@@ -39,19 +39,18 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
private FunctionException functionException = null;
- private Function function;
+ private Function function = null;
- private AbstractExecution<?, OUT, AGG> execution;
+ private AbstractExecution execution = null;
- public LocalResultCollectorImpl(Function function, ResultCollector<OUT, AGG> rc,
- Execution execution) {
+ public LocalResultCollectorImpl(Function function, ResultCollector rc, Execution execution) {
this.function = function;
this.userRC = rc;
this.execution = (AbstractExecution) execution;
}
@Override
- public synchronized void addResult(DistributedMember memberID, OUT resultOfSingleExecution) {
+ public synchronized void addResult(DistributedMember memberID, Object resultOfSingleExecution) {
if (resultsCleared) {
return;
}
@@ -62,7 +61,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
if (t.getCause() != null) {
t = t.getCause();
}
- this.userRC.addResult(memberID, (OUT) t);
+ this.userRC.addResult(memberID, t);
} else {
if (!(t instanceof InternalFunctionException)) {
if (this.functionException == null) {
@@ -79,7 +78,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
}
this.functionException.addException(t);
} else {
- this.userRC.addResult(memberID, (OUT) t.getCause());
+ this.userRC.addResult(memberID, t.getCause());
}
}
} else {
@@ -105,7 +104,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
}
@Override
- public AGG getResult() throws FunctionException {
+ public Object getResult() throws FunctionException {
if (this.resultCollected) {
throw new FunctionException(
"Function results already collected");
@@ -124,7 +123,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
.getCause() instanceof InternalFunctionInvocationTargetException) {
clearResults();
this.execution = this.execution.setIsReExecute();
- ResultCollector<OUT, AGG> newRc;
+ ResultCollector newRc = null;
if (execution.isFnSerializationReqd()) {
newRc = this.execution.execute(this.function);
} else {
@@ -135,12 +134,13 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
}
throw this.functionException;
} else {
- return this.userRC.getResult();
+ Object result = this.userRC.getResult();
+ return result;
}
}
@Override
- public AGG getResult(long timeout, TimeUnit unit)
+ public Object getResult(long timeout, TimeUnit unit)
throws FunctionException, InterruptedException {
boolean resultReceived = false;
@@ -166,7 +166,7 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
.getCause() instanceof InternalFunctionInvocationTargetException) {
clearResults();
this.execution = this.execution.setIsReExecute();
- ResultCollector<OUT, AGG> newRc;
+ ResultCollector newRc = null;
if (execution.isFnSerializationReqd()) {
newRc = this.execution.execute(this.function);
} else {
@@ -177,7 +177,8 @@ public class LocalResultCollectorImpl<OUT, AGG> implements LocalResultCollector<
}
throw this.functionException;
} else {
- return this.userRC.getResult(timeout, unit);
+ Object result = this.userRC.getResult(timeout, unit);
+ return result;
}
}
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
index 15f4a04..a18c560 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MemberFunctionExecutor.java
@@ -36,9 +36,7 @@ import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.GemFireCacheImpl;
import org.apache.geode.internal.cache.InternalCache;
-public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT>
- implements Execution<ArgumentT, ReturnT, AggregatorT> {
+public class MemberFunctionExecutor extends AbstractExecution {
protected InternalDistributedSystem ds;
@@ -97,7 +95,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@SuppressWarnings("unchecked")
- private ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function,
+ private ResultCollector executeFunction(final Function function,
ResultCollector resultCollector) {
final DistributionManager dm = this.ds.getDistributionManager();
final Set dest = new HashSet(this.members);
@@ -111,8 +109,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
final InternalDistributedMember localVM =
this.ds.getDistributionManager().getDistributionManagerId();
- final LocalResultCollector<ReturnT, AggregatorT> localRC =
- getLocalResultCollector(function, resultCollector);
+ final LocalResultCollector<?, ?> localRC = getLocalResultCollector(function, resultCollector);
boolean remoteOnly = false;
boolean localOnly = false;
if (!dest.contains(localVM)) {
@@ -132,7 +129,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
boolean isTx = false;
InternalCache cache = GemFireCacheImpl.getInstance();
if (cache != null) {
- isTx = cache.getTxManager().getTXState() != null;
+ isTx = cache.getTxManager().getTXState() == null ? false : true;
}
final FunctionContext context = new FunctionContextImpl(cache, function.getId(),
getArgumentsForMember(localVM.getId()), resultSender);
@@ -140,7 +137,8 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
if (!dest.isEmpty()) {
- HashMap<InternalDistributedMember, Object> memberArgs = new HashMap<>();
+ HashMap<InternalDistributedMember, Object> memberArgs =
+ new HashMap<InternalDistributedMember, Object>();
Iterator<DistributedMember> iter = dest.iterator();
while (iter.hasNext()) {
InternalDistributedMember recip = (InternalDistributedMember) iter.next();
@@ -150,7 +148,8 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
MemberFunctionResultWaiter resultReceiver = new MemberFunctionResultWaiter(this.ds, localRC,
function, memberArgs, dest, resultSender);
- return resultReceiver.getFunctionResultFrom(dest, function, this);
+ ResultCollector reply = resultReceiver.getFunctionResultFrom(dest, function, this);
+ return reply;
}
return localRC;
}
@@ -185,7 +184,7 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- protected ResultCollector<ReturnT, AggregatorT> executeFunction(Function function) {
+ protected ResultCollector executeFunction(Function function) {
if (function.hasResult()) {
ResultCollector rc = this.rc;
if (rc == null) {
@@ -199,56 +198,54 @@ public class MemberFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) {
+ public Execution setArguments(Object args) {
if (args == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"args"));
}
- return new MemberFunctionExecutor<>(this, args);
+ return new MemberFunctionExecutor(this, args);
}
// Changing the object!!
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) {
+ public Execution withArgs(Object args) {
return setArguments(args);
}
// Changing the object!!
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) {
+ public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"Result Collector"));
}
- return new MemberFunctionExecutor<>(this, rs);
+ return new MemberFunctionExecutor(this, rs);
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set filter) {
+ public Execution withFilter(Set filter) {
throw new FunctionException(
String.format("Cannot specify %s for data independent functions",
"filter"));
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(
- Set<Integer> bucketIDs) {
+ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
throw new FunctionException(
String.format("Cannot specify %s for data independent functions",
"bucket as filter"));
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument(
- MemberMappedArgument argument) {
+ public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
if (argument == null) {
throw new IllegalArgumentException(
String.format("The input %s for the execute function request is null",
"MemberMappedArgs"));
}
- return new MemberFunctionExecutor<>(this, argument);
+ return new MemberFunctionExecutor(this, argument);
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
index 1b1fd1f..be26cb9 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/MultiRegionFunctionExecutor.java
@@ -40,8 +40,7 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> {
+public class MultiRegionFunctionExecutor extends AbstractExecution {
private final Set<Region> regions;
@@ -51,6 +50,33 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
this.regions = regions;
}
+ private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor drfe) {
+ super(drfe);
+ this.regions = drfe.regions;
+ if (drfe.filter != null) {
+ this.filter.clear();
+ this.filter.addAll(drfe.filter);
+ }
+ this.sender = drfe.sender;
+ }
+
+ private MultiRegionFunctionExecutor(Set<Region> regions, Set filter2, Object args,
+ MemberMappedArgument memberMappedArg, ServerToClientFunctionResultSender resultSender) {
+ if (args != null) {
+ this.args = args;
+ } else if (memberMappedArg != null) {
+ this.memberMappedArg = memberMappedArg;
+ this.isMemberMappedArgument = true;
+ }
+ this.sender = resultSender;
+ if (filter2 != null) {
+ this.filter.clear();
+ this.filter.addAll(filter2);
+ }
+ this.regions = regions;
+ this.isClientServerMode = true;
+ }
+
private MultiRegionFunctionExecutor(MultiRegionFunctionExecutor executor,
MemberMappedArgument argument) {
super(executor);
@@ -143,8 +169,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(
- Set<Integer> bucketIDs) {
+ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
throw new FunctionException(
String.format("Cannot specify %s for multi region function",
"bucket as filter"));
@@ -175,7 +200,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
final Map<InternalDistributedMember, Set<String>> memberToRegionMap =
calculateMemberToRegionMap();
final Set<InternalDistributedMember> dest =
- new HashSet<>(memberToRegionMap.keySet());
+ new HashSet<InternalDistributedMember>(memberToRegionMap.keySet());
if (dest.isEmpty()) {
throw new FunctionException(
@@ -189,7 +214,6 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
setExecutionNodes(dest);
- assert cache != null;
final InternalDistributedMember localVM = cache.getMyId();
final LocalResultCollector<?, ?> localResultCollector =
getLocalResultCollector(function, resultCollector);
@@ -208,7 +232,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
// if member is local VM
dest.remove(localVM);
Set<String> regionPathSet = memberToRegionMap.get(localVM);
- Set<Region> regions = new HashSet<>();
+ Set<Region> regions = new HashSet<Region>();
if (regionPathSet != null) {
InternalCache cache1 = GemFireCacheImpl.getInstance();
for (String regionPath : regionPathSet) {
@@ -218,12 +242,12 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
final FunctionContextImpl context =
new MultiRegionFunctionContextImpl(cache, function.getId(),
getArgumentsForMember(localVM.getId()), resultSender, regions, this.isReExecute);
- boolean isTx = cache.getTxManager().getTXState() != null;
+ boolean isTx = cache.getTxManager().getTXState() == null ? false : true;
executeFunctionOnLocalNode(function, context, resultSender, dm, isTx);
}
if (!dest.isEmpty()) {
HashMap<InternalDistributedMember, Object> memberArgs =
- new HashMap<>();
+ new HashMap<InternalDistributedMember, Object>();
for (InternalDistributedMember recip : dest) {
memberArgs.put(recip, getArgumentsForMember(recip.getId()));
}
@@ -237,9 +261,10 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
private Map<InternalDistributedMember, Set<String>> calculateMemberToRegionMap() {
- Map<InternalDistributedMember, Set<String>> memberToRegions = new HashMap<>();
+ Map<InternalDistributedMember, Set<String>> memberToRegions =
+ new HashMap<InternalDistributedMember, Set<String>>();
// nodes is maintained for node pruning logic
- Set<InternalDistributedMember> nodes = new HashSet<>();
+ Set<InternalDistributedMember> nodes = new HashSet<InternalDistributedMember>();
for (Region region : regions) {
DataPolicy dp = region.getAttributes().getDataPolicy();
if (region instanceof PartitionedRegion) {
@@ -251,7 +276,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
InternalDistributedMember localVm = cache.getMyId();
Set<String> regions = memberToRegions.get(localVm);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(pr.getFullPath());
memberToRegions.put(localVm, regions);
@@ -260,7 +285,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
for (InternalDistributedMember member : prMembers) {
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(pr.getFullPath());
memberToRegions.put(member, regions);
@@ -280,7 +305,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
added = true;
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(dr.getFullPath());
memberToRegions.put(member, regions);
@@ -294,7 +319,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
.toArray()[new Random().nextInt(replicates.size())]);
Set<String> regions = memberToRegions.get(member);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(dr.getFullPath());
memberToRegions.put(member, regions);
@@ -305,7 +330,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(region.getFullPath());
memberToRegions.put(local, regions);
@@ -316,7 +341,7 @@ public class MultiRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
InternalDistributedMember local = cache.getMyId();
Set<String> regions = memberToRegions.get(local);
if (regions == null) {
- regions = new HashSet<>();
+ regions = new HashSet<String>();
}
regions.add(region.getFullPath());
memberToRegions.put(local, regions);
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
index f1aea8b..27728e3 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/PartitionedRegionFunctionExecutor.java
@@ -14,6 +14,7 @@
*/
package org.apache.geode.internal.cache.execute;
+import java.util.Iterator;
import java.util.Set;
import org.apache.geode.cache.Region;
@@ -28,8 +29,7 @@ import org.apache.geode.internal.cache.InternalCache;
import org.apache.geode.internal.cache.LocalRegion;
import org.apache.geode.internal.cache.PartitionedRegion;
-public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> {
+public class PartitionedRegionFunctionExecutor extends AbstractExecution {
private final PartitionedRegion pr;
@@ -48,6 +48,21 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
this.pr = (PartitionedRegion) r;
}
+ private PartitionedRegionFunctionExecutor(PartitionedRegionFunctionExecutor prfe) {
+ super(prfe);
+ this.pr = prfe.pr;
+ this.executeOnBucketSet = prfe.executeOnBucketSet;
+ this.isPRSingleHop = prfe.isPRSingleHop;
+ this.isReExecute = prfe.isReExecute;
+ if (prfe.filter != null) {
+ this.filter.clear();
+ this.filter.addAll(prfe.filter);
+ }
+ if (prfe.sender != null) {
+ this.sender = prfe.sender;
+ }
+ }
+
private PartitionedRegionFunctionExecutor(PartitionedRegionFunctionExecutor prfe,
MemberMappedArgument argument) {
// super copies args, rc and memberMappedArgument
@@ -234,7 +249,9 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
bucketIDs.retainAll(actualBucketSet);
- for (int bid : bucketIDs) {
+ Iterator<Integer> it = bucketIDs.iterator();
+ while (it.hasNext()) {
+ int bid = it.next();
if (!actualBucketSet.contains(bid)) {
throw new FunctionException("Bucket " + bid + " does not exist.");
}
@@ -299,14 +316,16 @@ public class PartitionedRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
@Override
public String toString() {
- return "[ PartitionedRegionFunctionExecutor:"
- + "args="
- + this.args
- + ";filter="
- + this.filter
- + ";region="
- + this.pr.getName()
- + "]";
+ final StringBuffer buf = new StringBuffer();
+ buf.append("[ PartitionedRegionFunctionExecutor:");
+ buf.append("args=");
+ buf.append(this.args);
+ buf.append(";filter=");
+ buf.append(this.filter);
+ buf.append(";region=");
+ buf.append(this.pr.getName());
+ buf.append("]");
+ return buf.toString();
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java
index 8dcb163..9cbc4dd 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerFunctionExecutor.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache.execute;
-import java.util.List;
import java.util.Set;
import org.apache.geode.cache.client.Pool;
@@ -33,9 +32,7 @@ import org.apache.geode.cache.execute.ResultCollector;
import org.apache.geode.internal.cache.TXManagerImpl;
import org.apache.geode.internal.cache.execute.util.SynchronizedResultCollector;
-public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT>
- implements Execution<ArgumentT, ReturnT, AggregatorT> {
+public class ServerFunctionExecutor extends AbstractExecution {
private PoolImpl pool;
@@ -82,9 +79,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
this.isMemberMappedArgument = true;
}
- protected ResultCollector<ReturnT, AggregatorT> executeFunction(final String functionId,
- boolean result,
- boolean isHA,
+ protected ResultCollector executeFunction(final String functionId, boolean result, boolean isHA,
boolean optimizeForWrite) {
try {
if (proxyCache != null) {
@@ -113,7 +108,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- protected ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function) {
+ protected ResultCollector executeFunction(final Function function) {
byte hasResult = 0;
try {
if (proxyCache != null) {
@@ -126,7 +121,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
if (function.hasResult()) { // have Results
hasResult = 1;
if (this.rc == null) { // Default Result Collector
- ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector();
+ ResultCollector defaultCollector = new DefaultResultCollector();
return executeOnServer(function, defaultCollector, hasResult);
} else { // Custome Result COllector
return executeOnServer(function, this.rc, hasResult);
@@ -141,9 +136,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
- private ResultCollector<ReturnT, AggregatorT> executeOnServer(Function function,
- ResultCollector rc,
- byte hasResult) {
+ private ResultCollector executeOnServer(Function function, ResultCollector rc, byte hasResult) {
FunctionStats stats = FunctionStats.getFunctionStats(function.getId());
try {
validateExecution(function, null);
@@ -165,9 +158,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
}
- private ResultCollector<ReturnT, AggregatorT> executeOnServer(String functionId,
- ResultCollector rc,
- byte hasResult,
+ private ResultCollector executeOnServer(String functionId, ResultCollector rc, byte hasResult,
boolean isHA, boolean optimizeForWrite) {
FunctionStats stats = FunctionStats.getFunctionStats(functionId);
try {
@@ -237,54 +228,52 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set filter) {
+ public Execution withFilter(Set filter) {
throw new FunctionException(
String.format("Cannot specify %s for data independent functions",
"filter"));
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(
- Set<Integer> bucketIDs) {
+ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
throw new FunctionException(
String.format("Cannot specify %s for data independent functions",
"buckets as filter"));
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) {
+ public Execution setArguments(Object args) {
if (args == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"args"));
}
- return new ServerFunctionExecutor<>(this, args);
+ return new ServerFunctionExecutor(this, args);
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) {
+ public Execution withArgs(Object args) {
return setArguments(args);
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) {
+ public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"Result Collector"));
}
- return new ServerFunctionExecutor<>(this, rs);
+ return new ServerFunctionExecutor(this, rs);
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument(
- MemberMappedArgument argument) {
+ public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
if (argument == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"MemberMapped Args"));
}
- return new ServerFunctionExecutor<>(this, argument);
+ return new ServerFunctionExecutor(this, argument);
}
@Override
@@ -295,7 +284,7 @@ public class ServerFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) {
+ public ResultCollector execute(final String functionName) {
if (functionName == null) {
throw new FunctionException(
"The input function for the execute function request is null");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
index f5d5cf2..150b0ae 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/execute/ServerRegionFunctionExecutor.java
@@ -14,7 +14,6 @@
*/
package org.apache.geode.internal.cache.execute;
-import java.util.List;
import java.util.Set;
import org.apache.logging.log4j.Logger;
@@ -41,8 +40,7 @@ import org.apache.geode.internal.logging.LogService;
* @see FunctionService#onRegion(Region) *
* @since GemFire 5.8 LA
*/
-public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
- extends AbstractExecution<ArgumentT, ReturnT, AggregatorT> {
+public class ServerRegionFunctionExecutor extends AbstractExecution {
private static final Logger logger = LogService.getLogger();
private final LocalRegion region;
@@ -115,29 +113,28 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withFilter(Set fltr) {
+ public Execution withFilter(Set fltr) {
if (fltr == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"filter"));
}
this.executeOnBucketSet = false;
- return new ServerRegionFunctionExecutor<>(this, fltr);
+ return new ServerRegionFunctionExecutor(this, fltr);
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withBucketFilter(
- Set<Integer> bucketIDs) {
+ public InternalExecution withBucketFilter(Set<Integer> bucketIDs) {
if (bucketIDs == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"buckets as filter"));
}
- return new ServerRegionFunctionExecutor<>(this, bucketIDs, true /* execute on bucketset */);
+ return new ServerRegionFunctionExecutor(this, bucketIDs, true /* execute on bucketset */);
}
@Override
- protected ResultCollector<ReturnT, AggregatorT> executeFunction(final Function function) {
+ protected ResultCollector executeFunction(final Function function) {
byte hasResult = 0;
try {
if (proxyCache != null) {
@@ -150,7 +147,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
if (function.hasResult()) { // have Results
hasResult = 1;
if (this.rc == null) { // Default Result Collector
- ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector();
+ ResultCollector defaultCollector = new DefaultResultCollector();
return executeOnServer(function, defaultCollector, hasResult);
} else { // Custome Result COllector
return executeOnServer(function, this.rc, hasResult);
@@ -164,8 +161,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
}
- protected ResultCollector<ReturnT, AggregatorT> executeFunction(final String functionId,
- boolean resultReq,
+ protected ResultCollector executeFunction(final String functionId, boolean resultReq,
boolean isHA, boolean optimizeForWrite) {
try {
if (proxyCache != null) {
@@ -178,7 +174,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
if (resultReq) { // have Results
hasResult = 1;
if (this.rc == null) { // Default Result Collector
- ResultCollector<Object, List<Object>> defaultCollector = new DefaultResultCollector();
+ ResultCollector defaultCollector = new DefaultResultCollector();
return executeOnServer(functionId, defaultCollector, hasResult, isHA, optimizeForWrite);
} else { // Custome Result COllector
return executeOnServer(functionId, this.rc, hasResult, isHA, optimizeForWrite);
@@ -192,8 +188,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
}
- private ResultCollector<ReturnT, AggregatorT> executeOnServer(Function function,
- ResultCollector collector,
+ private ResultCollector executeOnServer(Function function, ResultCollector collector,
byte hasResult) throws FunctionException {
ServerRegionProxy srp = getServerRegionProxy();
FunctionStats stats = FunctionStats.getFunctionStats(function.getId(), this.region.getSystem());
@@ -282,10 +277,13 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
return srp;
} else {
- String message = srp + ": "
- + "No available connection was found. Server Region Proxy is not available for this region "
- + region.getName();
- throw new FunctionException(message);
+ StringBuilder message = new StringBuilder();
+ message.append(srp).append(": ");
+ message
+ .append(
+ "No available connection was found. Server Region Proxy is not available for this region ")
+ .append(region.getName());
+ throw new FunctionException(message.toString());
}
}
@@ -295,45 +293,44 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
@Override
public String toString() {
- return "[ ServerRegionExecutor:" + "args=" + this.args
- + " ;filter=" + this.filter + " ;region=" + this.region.getName()
- + "]";
+ return new StringBuffer().append("[ ServerRegionExecutor:").append("args=").append(this.args)
+ .append(" ;filter=").append(this.filter).append(" ;region=").append(this.region.getName())
+ .append("]").toString();
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> setArguments(Object args) {
+ public Execution setArguments(Object args) {
if (args == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"args"));
}
- return new ServerRegionFunctionExecutor<>(this, args);
+ return new ServerRegionFunctionExecutor(this, args);
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withArgs(Object args) {
+ public Execution withArgs(Object args) {
return setArguments(args);
}
@Override
- public Execution<ArgumentT, ReturnT, AggregatorT> withCollector(ResultCollector rs) {
+ public Execution withCollector(ResultCollector rs) {
if (rs == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"Result Collector"));
}
- return new ServerRegionFunctionExecutor<>(this, rs);
+ return new ServerRegionFunctionExecutor(this, rs);
}
@Override
- public InternalExecution<ArgumentT, ReturnT, AggregatorT> withMemberMappedArgument(
- MemberMappedArgument argument) {
+ public InternalExecution withMemberMappedArgument(MemberMappedArgument argument) {
if (argument == null) {
throw new FunctionException(
String.format("The input %s for the execute function request is null",
"MemberMappedArgument"));
}
- return new ServerRegionFunctionExecutor<>(this, argument);
+ return new ServerRegionFunctionExecutor(this, argument);
}
@Override
@@ -347,7 +344,7 @@ public class ServerRegionFunctionExecutor<ArgumentT, ReturnT, AggregatorT>
}
@Override
- public ResultCollector<ReturnT, AggregatorT> execute(final String functionName) {
+ public ResultCollector execute(final String functionName) {
if (functionName == null) {
throw new FunctionException(
"The input function for the execute function request is null");
diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java
index 72e9105..52cd434 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/snapshot/WindowedExporter.java
@@ -32,7 +32,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.cache.Region;
-import org.apache.geode.cache.execute.Execution;
import org.apache.geode.cache.execute.FunctionContext;
import org.apache.geode.cache.execute.FunctionException;
import org.apache.geode.cache.execute.FunctionService;
@@ -77,17 +76,15 @@ public class WindowedExporter<K, V> implements Exporter<K, V> {
SnapshotPacket last = new SnapshotPacket();
DistributedMember me = region.getCache().getDistributedSystem().getDistributedMember();
- WindowedArgs<K, V> args = new WindowedArgs<>(me, options);
+ WindowedArgs<K, V> args = new WindowedArgs<K, V>(me, options);
WindowedExportCollector results = new WindowedExportCollector(local, last);
try {
// Since the ExportCollector already is a LocalResultsCollector it's ok not
// to keep the reference to the ResultsCollector returned from execute().
// Normally discarding the reference can cause issues if GC causes the
// weak ref in ProcessorKeeper21 to be collected!!
- Execution<Object, Object, BlockingQueue<SnapshotPacket>> onRegion =
- FunctionService.onRegion(region);
- InternalExecution exec =
- (InternalExecution) onRegion.setArguments(args).withCollector(results);
+ InternalExecution exec = (InternalExecution) FunctionService.onRegion(region)
+ .setArguments(args).withCollector(results);
// Ensure that our collector gets all exceptions so we can shut down the
// queue properly.
@@ -189,7 +186,7 @@ public class WindowedExporter<K, V> implements Exporter<K, V> {
try {
int bufferSize = 0;
- List<SnapshotRecord> buffer = new ArrayList<>();
+ List<SnapshotRecord> buffer = new ArrayList<SnapshotRecord>();
DistributedMember me = region.getCache().getDistributedSystem().getDistributedMember();
for (Iterator<Entry<K, V>> iter = region.entrySet().iterator(); iter.hasNext()
&& !window.isAborted();) {
@@ -277,10 +274,10 @@ public class WindowedExporter<K, V> implements Exporter<K, V> {
this.end = end;
done = new AtomicBoolean(false);
- members = new ConcurrentHashMap<>();
+ members = new ConcurrentHashMap<DistributedMember, Integer>();
// cannot bound queue to exert back pressure
- entries = new LinkedBlockingQueue<>();
+ entries = new LinkedBlockingQueue<SnapshotPacket>();
}
@Override
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java
index e6ed0ff..eb6549a 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/mutators/MemberConfigManager.java
@@ -97,11 +97,9 @@ public class MemberConfigManager implements ConfigurationManager<MemberConfig> {
private ArrayList<MemberInformation> getMemberInformation(
Set<DistributedMember> distributedMembers) {
- Execution<DistributedMember, MemberInformation, ArrayList<MemberInformation>> execution =
- FunctionService.onMembers(distributedMembers);
- ResultCollector<MemberInformation, ArrayList<MemberInformation>> resultCollector =
- execution.execute(new GetMemberInformationFunction());
- return resultCollector.getResult();
+ Execution execution = FunctionService.onMembers(distributedMembers);
+ ResultCollector<?, ?> resultCollector = execution.execute(new GetMemberInformationFunction());
+ return (ArrayList<MemberInformation>) resultCollector.getResult();
}
@VisibleForTesting
diff --git a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
index 88fc336..23eecda 100644
--- a/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
+++ b/geode-core/src/main/java/org/apache/geode/management/internal/configuration/realizers/RegionConfigRealizer.java
@@ -62,8 +62,7 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
* @param regionPath this is the full path of the region
*/
public void create(RegionConfig regionConfig, String regionPath, Cache cache) {
- RegionFactory<Object, Object> factory =
- getRegionFactory(cache, regionConfig.getRegionAttributes());
+ RegionFactory factory = getRegionFactory(cache, regionConfig.getRegionAttributes());
RegionPath regionPathData = new RegionPath(regionPath);
String regionName = regionPathData.getName();
String parentRegionPath = regionPathData.getParent();
@@ -76,9 +75,8 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
factory.createSubregion(parentRegion, regionName);
}
- private <K, V> RegionFactory<K, V> getRegionFactory(Cache cache,
- RegionAttributesType regionAttributes) {
- RegionFactory<K, V> factory = cache.createRegionFactory();
+ private RegionFactory getRegionFactory(Cache cache, RegionAttributesType regionAttributes) {
+ RegionFactory factory = cache.createRegionFactory();
factory.setDataPolicy(DataPolicy.fromString(regionAttributes.getDataPolicy().name()));
@@ -87,13 +85,13 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
}
if (regionAttributes.getCacheLoader() != null) {
- factory
+ ((RegionFactory<Object, Object>) factory)
.setCacheLoader(DeclarableTypeInstantiator.newInstance(regionAttributes.getCacheLoader(),
cache));
}
if (regionAttributes.getCacheWriter() != null) {
- factory
+ ((RegionFactory<Object, Object>) factory)
.setCacheWriter(DeclarableTypeInstantiator.newInstance(regionAttributes.getCacheWriter(),
cache));
}
@@ -104,25 +102,25 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
for (int i = 0; i < configListeners.size(); i++) {
listeners[i] = DeclarableTypeInstantiator.newInstance(configListeners.get(i), cache);
}
- factory.initCacheListeners(listeners);
+ ((RegionFactory<Object, Object>) factory).initCacheListeners(listeners);
}
final String keyConstraint = regionAttributes.getKeyConstraint();
final String valueConstraint = regionAttributes.getValueConstraint();
if (keyConstraint != null && !keyConstraint.isEmpty()) {
- Class<K> keyConstraintClass =
+ Class<Object> keyConstraintClass =
CliUtil.forName(keyConstraint, CliStrings.CREATE_REGION__KEYCONSTRAINT);
- factory.setKeyConstraint(keyConstraintClass);
+ ((RegionFactory<Object, Object>) factory).setKeyConstraint(keyConstraintClass);
}
if (valueConstraint != null && !valueConstraint.isEmpty()) {
- Class<V> valueConstraintClass =
+ Class<Object> valueConstraintClass =
CliUtil.forName(valueConstraint, CliStrings.CREATE_REGION__VALUECONSTRAINT);
- factory.setValueConstraint(valueConstraintClass);
+ ((RegionFactory<Object, Object>) factory).setValueConstraint(valueConstraintClass);
}
if (regionAttributes.getCompressor() != null) {
- factory
+ ((RegionFactory<Object, Object>) factory)
.setCompressor(DeclarableTypeInstantiator.newInstance(regionAttributes.getCompressor()));
}
@@ -134,13 +132,13 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
if (regionAttributes.getEntryIdleTime() != null) {
RegionAttributesType.ExpirationAttributesType eitl = regionAttributes.getEntryIdleTime();
- factory.setEntryIdleTimeout(
+ ((RegionFactory<Object, Object>) factory).setEntryIdleTimeout(
new ExpirationAttributes(Integer.valueOf(eitl.getTimeout()),
ExpirationAction.fromXmlString(eitl.getAction())));
if (eitl.getCustomExpiry() != null) {
- factory.setCustomEntryIdleTimeout(
+ ((RegionFactory<Object, Object>) factory).setCustomEntryIdleTimeout(
DeclarableTypeInstantiator.newInstance(eitl.getCustomExpiry(),
cache));
}
@@ -148,12 +146,12 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
if (regionAttributes.getEntryTimeToLive() != null) {
RegionAttributesType.ExpirationAttributesType ettl = regionAttributes.getEntryTimeToLive();
- factory.setEntryTimeToLive(
+ ((RegionFactory<Object, Object>) factory).setEntryTimeToLive(
new ExpirationAttributes(Integer.valueOf(ettl.getTimeout()),
ExpirationAction.fromXmlString(ettl.getAction())));
if (ettl.getCustomExpiry() != null) {
- factory
+ ((RegionFactory<Object, Object>) factory)
.setCustomEntryTimeToLive(DeclarableTypeInstantiator.newInstance(ettl.getCustomExpiry(),
cache));
}
@@ -161,14 +159,14 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
if (regionAttributes.getRegionIdleTime() != null) {
RegionAttributesType.ExpirationAttributesType ritl = regionAttributes.getRegionIdleTime();
- factory.setRegionIdleTimeout(
+ ((RegionFactory<Object, Object>) factory).setRegionIdleTimeout(
new ExpirationAttributes(Integer.valueOf(ritl.getTimeout()),
ExpirationAction.fromXmlString(ritl.getAction())));
}
if (regionAttributes.getRegionTimeToLive() != null) {
RegionAttributesType.ExpirationAttributesType rttl = regionAttributes.getRegionTimeToLive();
- factory.setRegionTimeToLive(
+ ((RegionFactory<Object, Object>) factory).setRegionTimeToLive(
new ExpirationAttributes(Integer.valueOf(rttl.getTimeout()),
ExpirationAction.fromXmlString(rttl.getAction())));
}
@@ -209,12 +207,12 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
if (regionAttributes.getGatewaySenderIds() != null) {
Arrays.stream(regionAttributes.getGatewaySenderIds().split(","))
- .forEach(factory::addGatewaySenderId);
+ .forEach(gsi -> factory.addGatewaySenderId(gsi));
}
if (regionAttributes.getAsyncEventQueueIds() != null) {
Arrays.stream(regionAttributes.getAsyncEventQueueIds().split(","))
- .forEach(factory::addAsyncEventQueueId);
+ .forEach(gsi -> factory.addAsyncEventQueueId(gsi));
}
factory.setConcurrencyChecksEnabled(regionAttributes.isConcurrencyChecksEnabled());
@@ -233,7 +231,7 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
return factory;
}
- private PartitionAttributesImpl convertToRegionFactoryPartitionAttributes(
+ PartitionAttributesImpl convertToRegionFactoryPartitionAttributes(
RegionAttributesType.PartitionAttributes configAttributes, Cache cache) {
PartitionAttributesImpl partitionAttributes = new PartitionAttributesImpl();
if (configAttributes == null) {
@@ -278,9 +276,9 @@ public class RegionConfigRealizer implements ConfigurationRealizer<RegionConfig>
if (configAttributes.getPartitionListeners() != null) {
List<DeclarableType> configListeners = configAttributes.getPartitionListeners();
- for (DeclarableType configListener : configListeners) {
+ for (int i = 0; i < configListeners.size(); i++) {
partitionAttributes.addPartitionListener(
- DeclarableTypeInstantiator.newInstance(configListener, cache));
+ DeclarableTypeInstantiator.newInstance(configListeners.get(i), cache));
}
}