You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2021/02/01 05:49:47 UTC

[flink-statefun] 02/03: [FLINK-21154] Rename packages for request-reply protocol Proto messages

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 36973beb8081c1005964dc2c082ebe41d0064b43
Author: Tzu-Li (Gordon) Tai <tz...@apache.org>
AuthorDate: Thu Jan 28 14:05:36 2021 +0800

    [FLINK-21154] Rename packages for request-reply protocol Proto messages
---
 .../statefun/flink/core/common/PolyglotUtil.java   |  2 +-
 .../flink/core/httpfn/HttpRequestReplyClient.java  |  4 ++--
 .../reqreply/PersistedRemoteFunctionValues.java    | 10 +++++-----
 .../flink/core/reqreply/RequestReplyClient.java    |  4 ++--
 .../flink/core/reqreply/RequestReplyFunction.java  | 14 +++++++-------
 .../PersistedRemoteFunctionValuesTest.java         |  8 ++++----
 .../core/reqreply/RequestReplyFunctionTest.java    | 22 +++++++++++-----------
 .../io/kafka/GenericKafkaEgressSerializer.java     |  2 +-
 .../polyglot/GenericKinesisEgressSerializer.java   |  2 +-
 .../src/main/protobuf/io/kafka-egress.proto        |  5 +++--
 .../src/main/protobuf/io/kinesis-egress.proto      |  5 +++--
 .../src/main/protobuf/sdk/request-reply.proto      | 15 ++++++++-------
 12 files changed, 48 insertions(+), 45 deletions(-)

diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
index 18cc5bb..eeb9468 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/PolyglotUtil.java
@@ -22,8 +22,8 @@ import com.google.protobuf.Message;
 import com.google.protobuf.Parser;
 import java.io.IOException;
 import java.io.InputStream;
-import org.apache.flink.statefun.flink.core.polyglot.generated.Address;
 import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
 
 public final class PolyglotUtil {
   private PolyglotUtil() {}
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
index f27528d..fd7e5fb 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/HttpRequestReplyClient.java
@@ -33,10 +33,10 @@ import okhttp3.Request;
 import okhttp3.RequestBody;
 import okhttp3.Response;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
 import org.apache.flink.statefun.flink.core.reqreply.RequestReplyClient;
 import org.apache.flink.statefun.flink.core.reqreply.ToFunctionRequestSummary;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
 import org.apache.flink.util.IOUtils;
 
 final class HttpRequestReplyClient implements RequestReplyClient {
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
index e0b43bc..42cffbe 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValues.java
@@ -23,14 +23,14 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
 import org.apache.flink.statefun.flink.core.types.remote.RemoteValueTypeMismatchException;
 import org.apache.flink.statefun.sdk.TypeName;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest;
 import org.apache.flink.statefun.sdk.state.Expiration;
 import org.apache.flink.statefun.sdk.state.PersistedStateRegistry;
 import org.apache.flink.statefun.sdk.state.RemotePersistedValue;
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
index fef64c5..19fddee 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java
@@ -20,8 +20,8 @@ package org.apache.flink.statefun.flink.core.reqreply;
 
 import java.util.concurrent.CompletableFuture;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
 
 public interface RequestReplyClient {
 
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
index b2054f2..51db78c 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunction.java
@@ -28,19 +28,19 @@ import java.util.concurrent.CompletableFuture;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.IncompleteInvocationContext;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.Context;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.annotations.Persisted;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.EgressMessage;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.IncompleteInvocationContext;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.Invocation;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest;
 import org.apache.flink.statefun.sdk.state.PersistedAppendingBuffer;
 import org.apache.flink.statefun.sdk.state.PersistedValue;
 import org.apache.flink.types.Either;
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
index a563886..b5f2927 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/PersistedRemoteFunctionValuesTest.java
@@ -26,11 +26,11 @@ import static org.hamcrest.core.Is.is;
 import com.google.protobuf.ByteString;
 import java.util.Arrays;
 import java.util.Collections;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.InvocationBatchRequest;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.PersistedValue;
 import org.apache.flink.statefun.sdk.TypeName;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.InvocationBatchRequest;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.PersistedValue;
 import org.junit.Test;
 
 public class PersistedRemoteFunctionValuesTest {
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
index d545281..9b5d9c9 100644
--- a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyFunctionTest.java
@@ -42,22 +42,22 @@ import org.apache.flink.statefun.flink.core.TestUtils;
 import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
 import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
 import org.apache.flink.statefun.flink.core.metrics.RemoteInvocationMetrics;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.DelayedInvocation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.EgressMessage;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.ExpirationSpec;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.IncompleteInvocationContext;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.InvocationResponse;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueMutation.MutationType;
-import org.apache.flink.statefun.flink.core.polyglot.generated.FromFunction.PersistedValueSpec;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction;
-import org.apache.flink.statefun.flink.core.polyglot.generated.ToFunction.Invocation;
 import org.apache.flink.statefun.sdk.Address;
 import org.apache.flink.statefun.sdk.AsyncOperationResult;
 import org.apache.flink.statefun.sdk.AsyncOperationResult.Status;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.DelayedInvocation;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.EgressMessage;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.IncompleteInvocationContext;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation.MutationType;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.Invocation;
 import org.junit.Test;
 
 public class RequestReplyFunctionTest {
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index e20bdf1..fb8a484 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.statefun.flink.io.kafka;
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
 import java.nio.charset.StandardCharsets;
-import org.apache.flink.statefun.flink.io.generated.KafkaProducerRecord;
+import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord;
 import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
index db3db58..4b1c522 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kinesis/polyglot/GenericKinesisEgressSerializer.java
@@ -20,7 +20,7 @@ package org.apache.flink.statefun.flink.io.kinesis.polyglot;
 
 import com.google.protobuf.Any;
 import com.google.protobuf.InvalidProtocolBufferException;
-import org.apache.flink.statefun.flink.io.generated.KinesisEgressRecord;
+import org.apache.flink.statefun.sdk.egress.generated.KinesisEgressRecord;
 import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord;
 import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer;
 
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
index 9284e1c..dc280b6 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kafka-egress.proto
@@ -18,8 +18,9 @@
 
 syntax = "proto3";
 
-package org.apache.flink.statefun.flink.io;
-option java_package = "org.apache.flink.statefun.flink.io.generated";
+package io.statefun.sdk.egress;
+
+option java_package = "org.apache.flink.statefun.sdk.egress.generated";
 option java_multiple_files = true;
 
 message KafkaProducerRecord {
diff --git a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
index 68c92c0..a365443 100644
--- a/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
+++ b/statefun-sdk-protos/src/main/protobuf/io/kinesis-egress.proto
@@ -18,8 +18,9 @@
 
 syntax = "proto3";
 
-package org.apache.flink.statefun.flink.io;
-option java_package = "org.apache.flink.statefun.flink.io.generated";
+package io.statefun.sdk.egress;
+
+option java_package = "org.apache.flink.statefun.sdk.egress.generated";
 option java_multiple_files = true;
 
 message KinesisEgressRecord {
diff --git a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
index b70e3a4..2ebd8f9 100644
--- a/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
+++ b/statefun-sdk-protos/src/main/protobuf/sdk/request-reply.proto
@@ -18,8 +18,9 @@
 
 syntax = "proto3";
 
-package org.apache.flink.statefun.flink.core.polyglot;
-option java_package = "org.apache.flink.statefun.flink.core.polyglot.generated";
+package io.statefun.sdk.reqreply;
+
+option java_package = "org.apache.flink.statefun.sdk.reqreply.generated";
 option java_multiple_files = true;
 
 import "google/protobuf/any.proto";
@@ -128,7 +129,7 @@ message FromFunction {
         google.protobuf.Any argument = 3;
     }
 
-    // InvocationResponse represents a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest
+    // InvocationResponse represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest
     // it contains a list of state mutation to preform as a result of computing this batch, and a list of outgoing messages.
     message InvocationResponse {
         repeated PersistedValueMutation state_mutations = 1;
@@ -155,17 +156,17 @@ message FromFunction {
         string type_typename = 3;
     }
 
-    // IncompleteInvocationContext represents a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest,
+    // IncompleteInvocationContext represents a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest,
     // which should be used as the response if the InvocationBatchRequest provided incomplete information about the
     // invocation, e.g. insufficient state values were provided.
     message IncompleteInvocationContext {
         repeated PersistedValueSpec missing_values = 1;
     }
 
-    // Response sent from the function, as a result of an org.apache.flink.statefun.flink.core.polyglot.ToFunction.InvocationBatchRequest.
+    // Response sent from the function, as a result of an io.statefun.sdk.reqreply.ToFunction.InvocationBatchRequest.
     // It can be one of the following types:
-    //   - org.apache.flink.statefun.flink.core.polyglot.FromFunction.InvocationResponse
-    //   - org.apache.flink.statefun.flink.core.polyglot.FromFunction.IncompleteInvocationContext
+    //   - io.statefun.sdk.reqreply.FromFunction.InvocationResponse
+    //   - io.statefun.sdk.reqreply.FromFunction.IncompleteInvocationContext
     oneof response {
         InvocationResponse invocation_result = 100;
         IncompleteInvocationContext incomplete_invocation_context = 101;