You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2020/02/10 03:43:37 UTC
[flink-statefun] 08/09: [FLINK-15955] Rename RemoteFunction* ->
GrpcFunction*
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 5686463495365acad3af336307f79ee92d47a853
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Fri Feb 7 18:22:32 2020 +0100
[FLINK-15955] Rename RemoteFunction* -> GrpcFunction*
---
.../jsonmodule/{RemoteFunction.java => GrpcFunction.java} | 6 +++---
...moteFunctionProvider.java => GrpcFunctionProvider.java} | 10 +++++-----
.../{RemoteFunctionSpec.java => GrpcFunctionSpec.java} | 14 ++++++++++----
.../flink/statefun/flink/core/jsonmodule/JsonModule.java | 10 +++++-----
.../src/test/resources/bar-module/module.yaml | 5 +++--
5 files changed, 26 insertions(+), 19 deletions(-)
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunction.java
similarity index 88%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunction.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunction.java
index 8d24dd5..c7e7d51 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunction.java
@@ -21,10 +21,10 @@ import java.util.Objects;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;
-public class RemoteFunction implements StatefulFunction {
- private final RemoteFunctionSpec functionSpec;
+public class GrpcFunction implements StatefulFunction {
+ private final GrpcFunctionSpec functionSpec;
- public RemoteFunction(RemoteFunctionSpec functionSpec) {
+ public GrpcFunction(GrpcFunctionSpec functionSpec) {
this.functionSpec = Objects.requireNonNull(functionSpec);
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionProvider.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionProvider.java
similarity index 79%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionProvider.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionProvider.java
index f088161..4d5db34 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionProvider.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionProvider.java
@@ -22,19 +22,19 @@ import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
-public class RemoteFunctionProvider implements StatefulFunctionProvider {
- private final Map<FunctionType, RemoteFunctionSpec> supportedTypes;
+public class GrpcFunctionProvider implements StatefulFunctionProvider {
+ private final Map<FunctionType, GrpcFunctionSpec> supportedTypes;
- public RemoteFunctionProvider(Map<FunctionType, RemoteFunctionSpec> supportedTypes) {
+ public GrpcFunctionProvider(Map<FunctionType, GrpcFunctionSpec> supportedTypes) {
this.supportedTypes = supportedTypes;
}
@Override
public StatefulFunction functionOfType(FunctionType type) {
- RemoteFunctionSpec spec = supportedTypes.get(type);
+ GrpcFunctionSpec spec = supportedTypes.get(type);
if (spec == null) {
throw new IllegalArgumentException("Unsupported type " + type);
}
- return new RemoteFunction(spec);
+ return new GrpcFunction(spec);
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionSpec.java
similarity index 81%
rename from statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionSpec.java
rename to statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionSpec.java
index 4cc0025..62bb4f8 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/RemoteFunctionSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/GrpcFunctionSpec.java
@@ -21,20 +21,26 @@ import java.net.SocketAddress;
import java.util.Objects;
import org.apache.flink.statefun.sdk.FunctionType;
-final class RemoteFunctionSpec {
+final class GrpcFunctionSpec implements FunctionSpec {
private final FunctionType functionType;
private final SocketAddress functionAddress;
- RemoteFunctionSpec(FunctionType functionType, SocketAddress functionAddress) {
+ GrpcFunctionSpec(FunctionType functionType, SocketAddress functionAddress) {
this.functionType = Objects.requireNonNull(functionType);
this.functionAddress = Objects.requireNonNull(functionAddress);
}
- FunctionType functionType() {
+ @Override
+ public FunctionType functionType() {
return functionType;
}
- SocketAddress address() {
+ @Override
+ public Kind kind() {
+ return Kind.GRPC;
+ }
+
+ public SocketAddress address() {
return functionAddress;
}
}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
index 3a15f76..4465864 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/JsonModule.java
@@ -64,15 +64,15 @@ final class JsonModule implements StatefulFunctionModule {
}
private void configureFunctions(Binder binder, Iterable<? extends JsonNode> functions) {
- final Map<FunctionType, RemoteFunctionSpec> definedFunctions =
+ final Map<FunctionType, GrpcFunctionSpec> definedFunctions =
StreamSupport.stream(functions.spliterator(), false)
.map(JsonModule::parseRemoteFunctionSpec)
- .collect(toMap(RemoteFunctionSpec::functionType, Function.identity()));
+ .collect(toMap(GrpcFunctionSpec::functionType, Function.identity()));
// currently we had a single function type that can be specified at a module.yaml
// and this is the RemoteFunction. Therefore we translate immediately the function spec
// to a (GRPC) RemoteFunctionProvider.
- RemoteFunctionProvider provider = new RemoteFunctionProvider(definedFunctions);
+ GrpcFunctionProvider provider = new GrpcFunctionProvider(definedFunctions);
for (FunctionType type : definedFunctions.keySet()) {
binder.bindFunctionProvider(type, provider);
}
@@ -168,10 +168,10 @@ final class JsonModule implements StatefulFunctionModule {
// Functions
// ----------------------------------------------------------------------------------------------------------
- private static RemoteFunctionSpec parseRemoteFunctionSpec(JsonNode functionNode) {
+ private static GrpcFunctionSpec parseRemoteFunctionSpec(JsonNode functionNode) {
FunctionType functionType = functionType(functionNode);
InetSocketAddress functionAddress = functionAddress(functionNode);
- return new RemoteFunctionSpec(functionType, functionAddress);
+ return new GrpcFunctionSpec(functionType, functionAddress);
}
private static FunctionType functionType(JsonNode functionNode) {
diff --git a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
index fe345f2..c659fe9 100644
--- a/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
+++ b/statefun-flink/statefun-flink-core/src/test/resources/bar-module/module.yaml
@@ -20,16 +20,17 @@ module:
functions:
- function:
meta:
+ kind: grpc
type: com.example/hello
spec:
host: localhost
port: 5000
- function:
meta:
+ kind: http
type: com.foo/world
spec:
- host: localhost
- port: 8855
+ endpoint: localhost:5959/statefun
routers:
- router:
meta: