You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pi...@apache.org on 2018/03/09 17:28:44 UTC
[geode] branch develop updated: GEODE-4751: Modify experimental
driver to support function execution. (#1579)
This is an automated email from the ASF dual-hosted git repository.
pivotalsarge pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push:
new 8672678 GEODE-4751: Modify experimental driver to support function execution. (#1579)
8672678 is described below
commit 8672678b7906a16a8140b9986f8c1f825414498b
Author: Michael "Sarge" Dodge <md...@pivotal.io>
AuthorDate: Fri Mar 9 09:28:41 2018 -0800
GEODE-4751: Modify experimental driver to support function execution. (#1579)
* GEODE-4751: Modify experimental driver to support function execution.
- Introduce function and function service interfaces.
- Enhance driver to produce function service.
* GEODE-4751: Address comments.
* GEODE-4751: More spotless.
---
.../apache/geode/experimental/driver/Driver.java | 12 ++++
.../apache/geode/experimental/driver/Function.java | 59 +++++++++++++++
.../driver/{Query.java => FunctionService.java} | 20 ++++--
.../geode/experimental/driver/ProtobufDriver.java | 5 ++
.../experimental/driver/ProtobufFunction.java | 83 ++++++++++++++++++++++
.../{Query.java => ProtobufFunctionService.java} | 13 ++--
.../apache/geode/experimental/driver/Query.java | 14 +++-
.../geode/experimental/driver/QueryService.java | 12 +++-
.../driver/FunctionServiceIntegrationTest.java | 60 ++++++++++++++++
.../experimental/driver/IntegrationTestBase.java | 35 ++++++++-
10 files changed, 300 insertions(+), 13 deletions(-)
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
index 65d8e37..57a4d05 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
@@ -48,9 +48,21 @@ public interface Driver {
*/
<K, V> Region<K, V> getRegion(String regionName);
+ /**
+ * Creates a new query service or retrieves an extant query service.
+ *
+ * @return Query service.
+ */
QueryService getQueryService();
/**
+ * Creates a new function service or retrieves an extant function service.
+ *
+ * @return Function service.
+ */
+ FunctionService getFunctionService();
+
+ /**
* Close this Driver, rendering it useless
*/
void close();
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java
new file mode 100644
index 0000000..7dd6bbd
--- /dev/null
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This interface represents from the client-side the execution of a function on the server-side.
+ *
+ * @param <T> Type of result returned from the function. May differ from the key and value types
+ * of any involved regions.
+ */
+public interface Function<T> {
+ /**
+ * Executes this function with the specified arguments on the specified region.
+ * Keys may be optionally filtered.
+ *
+ * @param arguments Object encapsulating all the arguments to this execution of the function.
+ * @param regionName Name of the region.
+ * @param keyFilters Optional list of keys.
+ * @return Possibly empty list of results from the function.
+ * @throws IOException If an error occurred communicating with the distributed system.
+ */
+ List<T> executeOnRegion(Object arguments, String regionName, Object... keyFilters)
+ throws IOException;
+
+ /**
+ * Executes this function with the specified arguments on the specified members.
+ *
+ * @param arguments Object encapsulating all the arguments to this execution of the function.
+ * @param members Optional list of member names.
+ * @return Possibly empty list of results from the function.
+ * @throws IOException If an error occurred communicating with the distributed system.
+ */
+ List<T> executeOnMember(Object arguments, String... members) throws IOException;
+
+ /**
+ * Executes this function with the specified arguments on the specified groups.
+ *
+ * @param arguments Object encapsulating all the arguments to this execution of the function.
+ * @param groups Optional list of group names.
+ * @return Possibly empty list of results from the function.
+ * @throws IOException If an error occurred communicating with the distributed system.
+ */
+ List<T> executeOnGroup(Object arguments, String... groups) throws IOException;
+}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
similarity index 60%
copy from geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
copy to geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
index fd0c1f4..3d928f2 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
@@ -14,10 +14,18 @@
*/
package org.apache.geode.experimental.driver;
-import java.io.IOException;
-import java.util.List;
-
-public interface Query<T> {
-
- List<T> execute(Object... bindParameters) throws IOException;
+/**
+ * This interface abstracts the creation of function abstractions.
+ */
+public interface FunctionService {
+ /**
+ * Creates a new object that allows the execution of functions in the distributed system.
+ *
+ * @param functionId Unique ID of the function that has already been registered in the distributed
+ * system.
+ * @param <T> Type of result returned from the function. May differ from the key and values types
+ * of any involved regions.
+ * @return Function abstraction.
+ */
+ <T> Function newFunction(String functionId);
}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index 9b233f7..9e8e8f9 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -78,6 +78,11 @@ public class ProtobufDriver implements Driver {
}
@Override
+ public FunctionService getFunctionService() {
+ return new ProtobufFunctionService(channel);
+ }
+
+ @Override
public void close() {
try {
final Message disconnectClientRequest = ClientProtocol.Message.newBuilder()
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java
new file mode 100644
index 0000000..186ce8f
--- /dev/null
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+
+public class ProtobufFunction<T> implements Function<T> {
+ private final String functionId;
+ private final ProtobufChannel channel;
+ private Set<String> members;
+ private Set<String> groups;
+
+ public ProtobufFunction(String functionId, ProtobufChannel channel) {
+ this.functionId = functionId;
+ this.channel = channel;
+ }
+
+ @Override
+ public List<T> executeOnRegion(Object arguments, String regionName, Object... keyFilters)
+ throws IOException {
+ List<BasicTypes.EncodedValue> encodedFilters = Arrays.asList(keyFilters).stream()
+ .map(ValueEncoder::encodeValue).collect(Collectors.toList());
+ ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+ .setExecuteFunctionOnRegionRequest(FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder()
+ .setRegion(regionName).addAllKeyFilter(encodedFilters).setFunctionID(functionId))
+ .build();
+ final FunctionAPI.ExecuteFunctionOnRegionResponse response = channel
+ .sendRequest(request,
+ ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONREGIONRESPONSE)
+ .getExecuteFunctionOnRegionResponse();
+ return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<T> executeOnMember(Object arguments, String... members) throws IOException {
+ final List<String> stringMembers = Arrays.asList(members);
+ ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+ .setExecuteFunctionOnMemberRequest(FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder()
+ .addAllMemberName(stringMembers).setFunctionID(functionId))
+ .build();
+ final FunctionAPI.ExecuteFunctionOnMemberResponse response = channel
+ .sendRequest(request,
+ ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERRESPONSE)
+ .getExecuteFunctionOnMemberResponse();
+ return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public List<T> executeOnGroup(Object arguments, String... groups) throws IOException {
+ final List<String> stringGroups = Arrays.asList(groups);
+ ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+ .setExecuteFunctionOnGroupRequest(FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder()
+ .addAllGroupName(stringGroups).setFunctionID(functionId))
+ .build();
+ final FunctionAPI.ExecuteFunctionOnGroupResponse response = channel
+ .sendRequest(request, ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE)
+ .getExecuteFunctionOnGroupResponse();
+ return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
similarity index 72%
copy from geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
copy to geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
index fd0c1f4..feedaec 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
@@ -14,10 +14,15 @@
*/
package org.apache.geode.experimental.driver;
-import java.io.IOException;
-import java.util.List;
+public class ProtobufFunctionService implements FunctionService {
+ private final ProtobufChannel channel;
-public interface Query<T> {
+ public ProtobufFunctionService(ProtobufChannel channel) {
+ this.channel = channel;
+ }
- List<T> execute(Object... bindParameters) throws IOException;
+ @Override
+ public <T> Function newFunction(String functionId) {
+ return new ProtobufFunction<T>(functionId, channel);
+ }
}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
index fd0c1f4..5179bd2 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
@@ -17,7 +17,19 @@ package org.apache.geode.experimental.driver;
import java.io.IOException;
import java.util.List;
+/**
+ * This interface represents from the client-side the execution of a query on the server-side.
+ *
+ * @param <T> Type of result returned from the query. May differ from the key and value types
+ * of any involved regions.
+ */
public interface Query<T> {
-
+ /**
+ * Executes this query with the specified parameters.
+ *
+ * @param bindParameters Optional list of bind parameters.
+ * @return Possibly empty list of results from the query.
+ * @throws IOException If an error occurred communicating with the distributed system.
+ */
List<T> execute(Object... bindParameters) throws IOException;
}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
index 5f1f89c..08a8e71 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
@@ -14,7 +14,17 @@
*/
package org.apache.geode.experimental.driver;
+/**
+ * This interface abstracts the creation of query abstractions.
+ */
public interface QueryService {
-
+ /**
+ * Creates a new object that allows the execution of querys in the distributed system.
+ *
+ * @param queryString OQL query.
+ * @param <T> Type of result returned from the query. May differ from the key and values types
+ * of any involved regions.
+ * @return Query abstraction.
+ */
<T> Query newQuery(String queryString);
}
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java
new file mode 100644
index 0000000..34b3e93
--- /dev/null
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class FunctionServiceIntegrationTest extends IntegrationTestBase {
+ @Test
+ public void testFunctionOnRegion() throws IOException {
+ FunctionService service = driver.getFunctionService();
+
+ Function<String> function = service.newFunction(FUNCTION_ID);
+ final List<String> results = function.executeOnRegion(null, serverRegion.getName());
+
+ assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+ }
+
+ @Test
+ public void testFunctionOnMember() throws IOException {
+ FunctionService service = driver.getFunctionService();
+
+ Function<String> function = service.newFunction(FUNCTION_ID);
+ final List<String> results = function.executeOnMember(null, NAME);
+
+ assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+ }
+
+ @Test
+ public void testFunctionOnGroup() throws IOException {
+ FunctionService service = driver.getFunctionService();
+
+ Function<String> function = service.newFunction(FUNCTION_ID);
+ final List<String> results = function.executeOnGroup(null, GROUP);
+
+ assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+ }
+}
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
index 7b8fd01..529a7ec 100644
--- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
@@ -14,7 +14,11 @@
*/
package org.apache.geode.experimental.driver;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
import java.util.Properties;
+import java.util.Set;
import org.junit.After;
import org.junit.Before;
@@ -23,7 +27,12 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
import org.apache.geode.cache.Cache;
import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
import org.apache.geode.cache.server.CacheServer;
import org.apache.geode.distributed.ConfigurationProperties;
import org.apache.geode.distributed.Locator;
@@ -32,7 +41,11 @@ import org.apache.geode.distributed.Locator;
* Created by dan on 2/23/18.
*/
public class IntegrationTestBase {
- private static final String REGION = "region";
+ protected static final String NAME = "name";
+ protected static final String GROUP = "group";
+ protected static final String REGION = "region";
+ protected static final String FUNCTION_ID = "function";
+
@Rule
public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
protected Driver driver;
@@ -47,6 +60,8 @@ public class IntegrationTestBase {
// Create a cache
CacheFactory cf = new CacheFactory();
cf.set(ConfigurationProperties.MCAST_PORT, "0");
+ cf.set(ConfigurationProperties.NAME, NAME);
+ cf.set(ConfigurationProperties.GROUPS, GROUP);
cache = cf.create();
// Start a locator
@@ -61,6 +76,9 @@ public class IntegrationTestBase {
// Create a region
serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION);
+ // Register a function
+ org.apache.geode.cache.execute.FunctionService.registerFunction(new TestFunction());
+
// Create a driver connected to the server
driver = new DriverFactory().addLocator("localhost", locatorPort).create();
}
@@ -70,4 +88,19 @@ public class IntegrationTestBase {
locator.stop();
cache.close();
}
+
+ class TestFunction implements org.apache.geode.cache.execute.Function {
+ @Override
+ public String getId() {
+ return FUNCTION_ID;
+ }
+
+ @Override
+ public void execute(FunctionContext context) {
+ final ResultSender resultSender = context.getResultSender();
+ resultSender.sendResult("first result");
+ resultSender.sendResult("next result");
+ resultSender.lastResult("last result");
+ }
+ }
}
--
To stop receiving notification emails like this one, please contact
pivotalsarge@apache.org.