You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@geode.apache.org by pi...@apache.org on 2018/03/09 17:28:44 UTC

[geode] branch develop updated: GEODE-4751: Modify experimental driver to support function execution. (#1579)

This is an automated email from the ASF dual-hosted git repository.

pivotalsarge pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 8672678  GEODE-4751: Modify experimental driver to support function execution. (#1579)
8672678 is described below

commit 8672678b7906a16a8140b9986f8c1f825414498b
Author: Michael "Sarge" Dodge <md...@pivotal.io>
AuthorDate: Fri Mar 9 09:28:41 2018 -0800

    GEODE-4751: Modify experimental driver to support function execution. (#1579)
    
    * GEODE-4751: Modify experimental driver to support function execution.
    
    - Introduce function and function service interfaces.
    - Enhance driver to produce function service.
    
    * GEODE-4751: Address comments.
    
    * GEODE-4751: More spotless.
---
 .../apache/geode/experimental/driver/Driver.java   | 12 ++++
 .../apache/geode/experimental/driver/Function.java | 59 +++++++++++++++
 .../driver/{Query.java => FunctionService.java}    | 20 ++++--
 .../geode/experimental/driver/ProtobufDriver.java  |  5 ++
 .../experimental/driver/ProtobufFunction.java      | 83 ++++++++++++++++++++++
 .../{Query.java => ProtobufFunctionService.java}   | 13 ++--
 .../apache/geode/experimental/driver/Query.java    | 14 +++-
 .../geode/experimental/driver/QueryService.java    | 12 +++-
 .../driver/FunctionServiceIntegrationTest.java     | 60 ++++++++++++++++
 .../experimental/driver/IntegrationTestBase.java   | 35 ++++++++-
 10 files changed, 300 insertions(+), 13 deletions(-)

diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
index 65d8e37..57a4d05 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Driver.java
@@ -48,9 +48,21 @@ public interface Driver {
    */
   <K, V> Region<K, V> getRegion(String regionName);
 
+  /**
+   * Creates a new query service or retrieves an extant query service.
+   *
+   * @return Query service.
+   */
   QueryService getQueryService();
 
   /**
+   * Creates a new function service or retrieves an extant function service.
+   *
+   * @return Function service.
+   */
+  FunctionService getFunctionService();
+
+  /**
    * Close this Driver, rendering it useless
    */
   void close();
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java
new file mode 100644
index 0000000..7dd6bbd
--- /dev/null
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Function.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * This interface represents from the client-side the execution of a function on the server-side.
+ *
+ * @param <T> Type of result returned from the function. May differ from the key and value types
+ *        of any involved regions.
+ */
+public interface Function<T> {
+  /**
+   * Executes this function with the specified arguments on the specified region.
+   * Keys may be optionally filtered.
+   *
+   * @param arguments Object encapsulating all the arguments to this execution of the function.
+   * @param regionName Name of the region.
+   * @param keyFilters Optional list of keys.
+   * @return Possibly empty list of results from the function.
+   * @throws IOException If an error occurred communicating with the distributed system.
+   */
+  List<T> executeOnRegion(Object arguments, String regionName, Object... keyFilters)
+      throws IOException;
+
+  /**
+   * Executes this function with the specified arguments on the specified members.
+   *
+   * @param arguments Object encapsulating all the arguments to this execution of the function.
+   * @param members Optional list of member names.
+   * @return Possibly empty list of results from the function.
+   * @throws IOException If an error occurred communicating with the distributed system.
+   */
+  List<T> executeOnMember(Object arguments, String... members) throws IOException;
+
+  /**
+   * Executes this function with the specified arguments on the specified groups.
+   *
+   * @param arguments Object encapsulating all the arguments to this execution of the function.
+   * @param groups Optional list of group names.
+   * @return Possibly empty list of results from the function.
+   * @throws IOException If an error occurred communicating with the distributed system.
+   */
+  List<T> executeOnGroup(Object arguments, String... groups) throws IOException;
+}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
similarity index 60%
copy from geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
copy to geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
index fd0c1f4..3d928f2 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/FunctionService.java
@@ -14,10 +14,18 @@
  */
 package org.apache.geode.experimental.driver;
 
-import java.io.IOException;
-import java.util.List;
-
-public interface Query<T> {
-
-  List<T> execute(Object... bindParameters) throws IOException;
+/**
+ * This interface abstracts the creation of function abstractions.
+ */
+public interface FunctionService {
+  /**
+   * Creates a new object that allows the execution of functions in the distributed system.
+   *
+   * @param functionId Unique ID of the function that has already been registered in the distributed
+   *        system.
+   * @param <T> Type of result returned from the function. May differ from the key and values types
+   *        of any involved regions.
+   * @return Function abstraction.
+   */
+  <T> Function newFunction(String functionId);
 }
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
index 9b233f7..9e8e8f9 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufDriver.java
@@ -78,6 +78,11 @@ public class ProtobufDriver implements Driver {
   }
 
   @Override
+  public FunctionService getFunctionService() {
+    return new ProtobufFunctionService(channel);
+  }
+
+  @Override
   public void close() {
     try {
       final Message disconnectClientRequest = ClientProtocol.Message.newBuilder()
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java
new file mode 100644
index 0000000..186ce8f
--- /dev/null
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunction.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+
+public class ProtobufFunction<T> implements Function<T> {
+  private final String functionId;
+  private final ProtobufChannel channel;
+  private Set<String> members;
+  private Set<String> groups;
+
+  public ProtobufFunction(String functionId, ProtobufChannel channel) {
+    this.functionId = functionId;
+    this.channel = channel;
+  }
+
+  @Override
+  public List<T> executeOnRegion(Object arguments, String regionName, Object... keyFilters)
+      throws IOException {
+    List<BasicTypes.EncodedValue> encodedFilters = Arrays.asList(keyFilters).stream()
+        .map(ValueEncoder::encodeValue).collect(Collectors.toList());
+    ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+        .setExecuteFunctionOnRegionRequest(FunctionAPI.ExecuteFunctionOnRegionRequest.newBuilder()
+            .setRegion(regionName).addAllKeyFilter(encodedFilters).setFunctionID(functionId))
+        .build();
+    final FunctionAPI.ExecuteFunctionOnRegionResponse response = channel
+        .sendRequest(request,
+            ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONREGIONRESPONSE)
+        .getExecuteFunctionOnRegionResponse();
+    return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<T> executeOnMember(Object arguments, String... members) throws IOException {
+    final List<String> stringMembers = Arrays.asList(members);
+    ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+        .setExecuteFunctionOnMemberRequest(FunctionAPI.ExecuteFunctionOnMemberRequest.newBuilder()
+            .addAllMemberName(stringMembers).setFunctionID(functionId))
+        .build();
+    final FunctionAPI.ExecuteFunctionOnMemberResponse response = channel
+        .sendRequest(request,
+            ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONMEMBERRESPONSE)
+        .getExecuteFunctionOnMemberResponse();
+    return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public List<T> executeOnGroup(Object arguments, String... groups) throws IOException {
+    final List<String> stringGroups = Arrays.asList(groups);
+    ClientProtocol.Message request = ClientProtocol.Message.newBuilder()
+        .setExecuteFunctionOnGroupRequest(FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder()
+            .addAllGroupName(stringGroups).setFunctionID(functionId))
+        .build();
+    final FunctionAPI.ExecuteFunctionOnGroupResponse response = channel
+        .sendRequest(request, ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE)
+        .getExecuteFunctionOnGroupResponse();
+    return response.getResultsList().stream().map(value -> (T) ValueEncoder.decodeValue(value))
+        .collect(Collectors.toList());
+  }
+}
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
similarity index 72%
copy from geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
copy to geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
index fd0c1f4..feedaec 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/ProtobufFunctionService.java
@@ -14,10 +14,15 @@
  */
 package org.apache.geode.experimental.driver;
 
-import java.io.IOException;
-import java.util.List;
+public class ProtobufFunctionService implements FunctionService {
+  private final ProtobufChannel channel;
 
-public interface Query<T> {
+  public ProtobufFunctionService(ProtobufChannel channel) {
+    this.channel = channel;
+  }
 
-  List<T> execute(Object... bindParameters) throws IOException;
+  @Override
+  public <T> Function newFunction(String functionId) {
+    return new ProtobufFunction<T>(functionId, channel);
+  }
 }
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
index fd0c1f4..5179bd2 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/Query.java
@@ -17,7 +17,19 @@ package org.apache.geode.experimental.driver;
 import java.io.IOException;
 import java.util.List;
 
+/**
+ * This interface represents from the client-side the execution of a query on the server-side.
+ *
+ * @param <T> Type of result returned from the query. May differ from the key and value types
+ *        of any involved regions.
+ */
 public interface Query<T> {
-
+  /**
+   * Executes this query with the specified parameters.
+   *
+   * @param bindParameters Optional list of bind parameters.
+   * @return Possibly empty list of results from the query.
+   * @throws IOException If an error occurred communicating with the distributed system.
+   */
   List<T> execute(Object... bindParameters) throws IOException;
 }
diff --git a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
index 5f1f89c..08a8e71 100644
--- a/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
+++ b/geode-experimental-driver/src/main/java/org/apache/geode/experimental/driver/QueryService.java
@@ -14,7 +14,17 @@
  */
 package org.apache.geode.experimental.driver;
 
+/**
+ * This interface abstracts the creation of query abstractions.
+ */
 public interface QueryService {
-
+  /**
+   * Creates a new object that allows the execution of querys in the distributed system.
+   *
+   * @param queryString OQL query.
+   * @param <T> Type of result returned from the query. May differ from the key and values types
+   *        of any involved regions.
+   * @return Query abstraction.
+   */
   <T> Query newQuery(String queryString);
 }
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java
new file mode 100644
index 0000000..34b3e93
--- /dev/null
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/FunctionServiceIntegrationTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.geode.experimental.driver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class FunctionServiceIntegrationTest extends IntegrationTestBase {
+  @Test
+  public void testFunctionOnRegion() throws IOException {
+    FunctionService service = driver.getFunctionService();
+
+    Function<String> function = service.newFunction(FUNCTION_ID);
+    final List<String> results = function.executeOnRegion(null, serverRegion.getName());
+
+    assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+  }
+
+  @Test
+  public void testFunctionOnMember() throws IOException {
+    FunctionService service = driver.getFunctionService();
+
+    Function<String> function = service.newFunction(FUNCTION_ID);
+    final List<String> results = function.executeOnMember(null, NAME);
+
+    assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+  }
+
+  @Test
+  public void testFunctionOnGroup() throws IOException {
+    FunctionService service = driver.getFunctionService();
+
+    Function<String> function = service.newFunction(FUNCTION_ID);
+    final List<String> results = function.executeOnGroup(null, GROUP);
+
+    assertEquals(Arrays.asList("first result", "next result", "last result"), results);
+  }
+}
diff --git a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
index 7b8fd01..529a7ec 100644
--- a/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
+++ b/geode-experimental-driver/src/test/java/org/apache/geode/experimental/driver/IntegrationTestBase.java
@@ -14,7 +14,11 @@
  */
 package org.apache.geode.experimental.driver;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
 import java.util.Properties;
+import java.util.Set;
 
 import org.junit.After;
 import org.junit.Before;
@@ -23,7 +27,12 @@ import org.junit.contrib.java.lang.system.RestoreSystemProperties;
 
 import org.apache.geode.cache.Cache;
 import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.Region;
 import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.RegionFunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
 import org.apache.geode.cache.server.CacheServer;
 import org.apache.geode.distributed.ConfigurationProperties;
 import org.apache.geode.distributed.Locator;
@@ -32,7 +41,11 @@ import org.apache.geode.distributed.Locator;
  * Created by dan on 2/23/18.
  */
 public class IntegrationTestBase {
-  private static final String REGION = "region";
+  protected static final String NAME = "name";
+  protected static final String GROUP = "group";
+  protected static final String REGION = "region";
+  protected static final String FUNCTION_ID = "function";
+
   @Rule
   public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties();
   protected Driver driver;
@@ -47,6 +60,8 @@ public class IntegrationTestBase {
     // Create a cache
     CacheFactory cf = new CacheFactory();
     cf.set(ConfigurationProperties.MCAST_PORT, "0");
+    cf.set(ConfigurationProperties.NAME, NAME);
+    cf.set(ConfigurationProperties.GROUPS, GROUP);
     cache = cf.create();
 
     // Start a locator
@@ -61,6 +76,9 @@ public class IntegrationTestBase {
     // Create a region
     serverRegion = cache.createRegionFactory(RegionShortcut.REPLICATE).create(REGION);
 
+    // Register a function
+    org.apache.geode.cache.execute.FunctionService.registerFunction(new TestFunction());
+
     // Create a driver connected to the server
     driver = new DriverFactory().addLocator("localhost", locatorPort).create();
   }
@@ -70,4 +88,19 @@ public class IntegrationTestBase {
     locator.stop();
     cache.close();
   }
+
+  class TestFunction implements org.apache.geode.cache.execute.Function {
+    @Override
+    public String getId() {
+      return FUNCTION_ID;
+    }
+
+    @Override
+    public void execute(FunctionContext context) {
+      final ResultSender resultSender = context.getResultSender();
+      resultSender.sendResult("first result");
+      resultSender.sendResult("next result");
+      resultSender.lastResult("last result");
+    }
+  }
 }

-- 
To stop receiving notification emails like this one, please contact
pivotalsarge@apache.org.