You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2021/02/24 11:30:23 UTC
[flink-statefun] 01/01: [FLINK-21459] Implement remote Java SDK for
Stateful Functions
This is an automated email from the ASF dual-hosted git repository.
tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git
commit 20b521e5c131c914617008033bd4c63fba81fe97
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue Feb 23 18:09:36 2021 +0800
[FLINK-21459] Implement remote Java SDK for Stateful Functions
Add the initial implementation of the Java SDK on top of the new remote invocation protocol.
Co-authored-by: Tzu-Li (Gordon) Tai <tz...@apache.org>
This closes #201.
---
statefun-sdk-java/pom.xml | 19 +-
.../java/com/google/protobuf/MoreByteStrings.java | 39 ++
.../apache/flink/statefun/sdk/java/Address.java | 85 ++++
.../statefun/sdk/java/AddressScopedStorage.java | 28 ++
.../flink/statefun/sdk/java/ApiExtension.java | 34 ++
.../apache/flink/statefun/sdk/java/Context.java | 43 ++
.../apache/flink/statefun/sdk/java/Expiration.java | 92 +++++
.../flink/statefun/sdk/java/StatefulFunction.java | 26 ++
.../statefun/sdk/java/StatefulFunctionSpec.java | 78 ++++
.../flink/statefun/sdk/java/StatefulFunctions.java | 46 +++
.../apache/flink/statefun/sdk/java/TypeName.java | 137 +++++++
.../apache/flink/statefun/sdk/java/ValueSpec.java | 115 ++++++
.../statefun/sdk/java/annotations/Internal.java | 30 ++
.../sdk/java/handler/ConcurrentContext.java | 147 +++++++
.../handler/ConcurrentRequestReplyHandler.java | 150 +++++++
.../statefun/sdk/java/handler/MoreFutures.java | 66 +++
.../statefun/sdk/java/handler/ProtoUtils.java | 97 +++++
.../sdk/java/handler/RequestReplyHandler.java | 34 ++
.../statefun/sdk/java/io/KafkaEgressMessage.java | 127 ++++++
.../statefun/sdk/java/io/KinesisEgressMessage.java | 146 +++++++
.../statefun/sdk/java/message/EgressMessage.java | 30 ++
.../sdk/java/message/EgressMessageWrapper.java | 55 +++
.../flink/statefun/sdk/java/message/Message.java | 60 +++
.../statefun/sdk/java/message/MessageBuilder.java | 113 +++++
.../statefun/sdk/java/message/MessageWrapper.java | 133 ++++++
.../statefun/sdk/java/slice/ByteStringSlice.java | 80 ++++
.../flink/statefun/sdk/java/slice/Slice.java | 40 ++
.../flink/statefun/sdk/java/slice/SliceOutput.java | 108 +++++
.../statefun/sdk/java/slice/SliceProtobufUtil.java | 55 +++
.../flink/statefun/sdk/java/slice/Slices.java | 61 +++
.../storage/ConcurrentAddressScopedStorage.java | 347 ++++++++++++++++
.../storage/IllegalStorageAccessException.java | 28 ++
.../sdk/java/storage/StateValueContexts.java | 131 ++++++
.../flink/statefun/sdk/java/types/SimpleType.java | 104 +++++
.../apache/flink/statefun/sdk/java/types/Type.java | 33 ++
.../sdk/java/types/TypeCharacteristics.java | 22 +
.../statefun/sdk/java/types/TypeSerializer.java | 27 ++
.../flink/statefun/sdk/java/types/Types.java | 456 +++++++++++++++++++++
.../handler/ConcurrentRequestReplyHandlerTest.java | 116 ++++++
.../statefun/sdk/java/handler/MoreFuturesTest.java | 93 +++++
.../flink/statefun/sdk/java/handler/TestUtils.java | 96 +++++
.../statefun/sdk/java/slice/SliceOutputTest.java | 144 +++++++
.../sdk/java/slice/SliceProtobufUtilTest.java | 36 ++
.../ConcurrentAddressScopedStorageTest.java | 206 ++++++++++
.../sdk/java/storage/StateValueContextsTest.java | 150 +++++++
.../statefun/sdk/java/storage/TestMutableType.java | 77 ++++
.../sdk/java/types/SanityPrimitiveTypeTest.java | 194 +++++++++
47 files changed, 4529 insertions(+), 5 deletions(-)
diff --git a/statefun-sdk-java/pom.xml b/statefun-sdk-java/pom.xml
index 62bb8de..f58829a 100644
--- a/statefun-sdk-java/pom.xml
+++ b/statefun-sdk-java/pom.xml
@@ -24,13 +24,10 @@ under the License.
<version>2.3-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
-
<artifactId>statefun-sdk-java</artifactId>
-
<properties>
<additional-sources.dir>target/additional-sources</additional-sources.dir>
</properties>
-
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
@@ -42,8 +39,20 @@ under the License.
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.12</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
-
<build>
<plugins>
<!--
@@ -145,4 +154,4 @@ under the License.
</plugin>
</plugins>
</build>
-</project>
+</project>
\ No newline at end of file
diff --git a/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
new file mode 100644
index 0000000..9c4269e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.protobuf;
+
+import java.nio.ByteBuffer;
+
+public class MoreByteStrings {
+
+ public static ByteString wrap(byte[] bytes) {
+ return ByteString.wrap(bytes);
+ }
+
+ public static ByteString wrap(byte[] bytes, int offset, int len) {
+ return ByteString.wrap(bytes, offset, len);
+ }
+
+ public static ByteString wrap(ByteBuffer buffer) {
+ return ByteString.wrap(buffer);
+ }
+
+ public static ByteString concat(ByteString first, ByteString second) {
+ return first.concat(second);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
new file mode 100644
index 0000000..86dbe76
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Objects;
+
+/**
+ * An {@link Address} is the unique identity of an individual {@link StatefulFunction}, containing
+ * of the function's {@link TypeName} and an unique identifier within the type. The function's type
+ * denotes the class of function to invoke, while the unique identifier addresses the invocation to
+ * a specific function instance.
+ */
+public final class Address {
+ private final TypeName type;
+ private final String id;
+
+ /**
+ * Creates an {@link Address}.
+ *
+ * @param type type of the function.
+ * @param id unique id within the function type.
+ */
+ public Address(TypeName type, String id) {
+ this.type = Objects.requireNonNull(type);
+ this.id = Objects.requireNonNull(id);
+ }
+
+ /**
+ * Returns the {@link TypeName} that this address identifies.
+ *
+ * @return type of the function
+ */
+ public TypeName type() {
+ return type;
+ }
+
+ /**
+ * Returns the unique function id, within its type, that this address identifies.
+ *
+ * @return unique id within the function type.
+ */
+ public String id() {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Address address = (Address) o;
+ return type.equals(address.type) && id.equals(address.id);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ hash = 37 * hash + type.hashCode();
+ hash = 37 * hash + id.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Address(%s, %s, %s)", type.namespace(), type.name(), id);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
new file mode 100644
index 0000000..d04310a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Optional;
+
+public interface AddressScopedStorage {
+ <T> Optional<T> get(ValueSpec<T> descriptor);
+
+ <T> void set(ValueSpec<T> key, T value);
+
+ <T> void remove(ValueSpec<T> key);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
new file mode 100644
index 0000000..2d4ebe4
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class ApiExtension {
+
+ public static ByteString typeNameByteString(TypeName typeName) {
+ return typeName.typeNameByteString();
+ }
+
+ public static ByteString stateNameByteString(ValueSpec<?> spec) {
+ return spec.nameByteString();
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
new file mode 100644
index 0000000..9bc011b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface Context {
+
+ Address self();
+
+ Optional<Address> caller();
+
+ void send(Message message);
+
+ void sendAfter(Duration duration, Message message);
+
+ void send(EgressMessage message);
+
+ AddressScopedStorage storage();
+
+ default CompletableFuture<Void> done() {
+ return CompletableFuture.completedFuture(null);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
new file mode 100644
index 0000000..060c7ab
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * State Expiration Configuration
+ *
+ * <p>This class defines the way state can be auto expired by the runtime. State expiration (also
+ * known as state TTL) can be used to keep state from growing arbitrarily by assigning an expiration
+ * date to a value.
+ *
+ * <p>State can be expired after a duration had passed since either from the last write to the
+ * state, or the last read.
+ */
+public final class Expiration implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ public enum Mode {
+ NONE,
+ AFTER_WRITE,
+ AFTER_READ_OR_WRITE;
+ }
+
+ /**
+ * Returns an Expiration configuration that would expire a @duration after the last write.
+ *
+ * @param duration a duration to wait before considering the state expired.
+ */
+ public static Expiration expireAfterWriting(Duration duration) {
+ return new Expiration(Mode.AFTER_WRITE, duration);
+ }
+
+ /**
+ * Returns an Expiration configuration that would expire a @duration after the last write or read.
+ *
+ * @param duration a duration to wait before considering the state expired.
+ */
+ public static Expiration expireAfterReadingOrWriting(Duration duration) {
+ return new Expiration(Mode.AFTER_READ_OR_WRITE, duration);
+ }
+
+ public static Expiration expireAfter(Duration duration, Mode mode) {
+ return new Expiration(mode, duration);
+ }
+
+ /** @return Returns a disabled expiration */
+ public static Expiration none() {
+ return new Expiration(Mode.NONE, Duration.ZERO);
+ }
+
+ private final Mode mode;
+ private final Duration duration;
+
+ private Expiration(Mode mode, Duration duration) {
+ this.mode = Objects.requireNonNull(mode);
+ this.duration = Objects.requireNonNull(duration);
+ }
+
+ public Mode mode() {
+ return mode;
+ }
+
+ public Duration duration() {
+ return duration;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("Expiration{mode=%s, duration=%s}", mode, duration);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
new file mode 100644
index 0000000..bd9aef6
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface StatefulFunction {
+
+ CompletableFuture<Void> apply(Context context, Message argument) throws Throwable;
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
new file mode 100644
index 0000000..b7dc7bc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+public final class StatefulFunctionSpec {
+ private final TypeName typeName;
+ private final Map<String, ValueSpec<?>> knownValues;
+ private final Supplier<? extends StatefulFunction> supplier;
+
+ public static Builder builder(TypeName typeName) {
+ return new Builder(typeName);
+ }
+
+ private StatefulFunctionSpec(
+ TypeName typeName,
+ Map<String, ValueSpec<?>> knownValues,
+ Supplier<? extends StatefulFunction> supplier) {
+ this.typeName = Objects.requireNonNull(typeName);
+ this.supplier = Objects.requireNonNull(supplier);
+ this.knownValues = Objects.requireNonNull(knownValues);
+ }
+
+ public TypeName typeName() {
+ return typeName;
+ }
+
+ public Map<String, ValueSpec<?>> knownValues() {
+ return knownValues;
+ }
+
+ public Supplier<? extends StatefulFunction> supplier() {
+ return supplier;
+ }
+
+ public static final class Builder {
+ private final TypeName typeName;
+ private final Map<String, ValueSpec<?>> knownValues = new HashMap<>();
+ private Supplier<? extends StatefulFunction> supplier;
+
+ private Builder(TypeName typeName) {
+ this.typeName = Objects.requireNonNull(typeName);
+ }
+
+ public Builder withValueSpec(ValueSpec<?> valueSpec) {
+ knownValues.put(valueSpec.name(), valueSpec);
+ return this;
+ }
+
+ public Builder withSupplier(Supplier<? extends StatefulFunction> supplier) {
+ this.supplier = Objects.requireNonNull(supplier);
+ return this;
+ }
+
+ public StatefulFunctionSpec build() {
+ return new StatefulFunctionSpec(typeName, knownValues, supplier);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
new file mode 100644
index 0000000..9561827
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+public class StatefulFunctions {
+ private final Map<TypeName, StatefulFunctionSpec> specs = new HashMap<>();
+
+ public StatefulFunctions withStatefulFunction(StatefulFunctionSpec.Builder builder) {
+ StatefulFunctionSpec spec = builder.build();
+ specs.put(spec.typeName(), spec);
+ return this;
+ }
+
+ public StatefulFunctions withStatefulFunction(StatefulFunctionSpec spec) {
+ specs.put(spec.typeName(), spec);
+ return this;
+ }
+
+ public Map<TypeName, StatefulFunctionSpec> functionSpecs() {
+ return specs;
+ }
+
+ public RequestReplyHandler requestReplyHandler() {
+ return new ConcurrentRequestReplyHandler(specs);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
new file mode 100644
index 0000000..ff7fde5
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * This class represents the type of a {@code StatefulFunction}, consisting of a namespace of the
+ * function type as well as the type's name.
+ *
+ * <p>A function's type is part of a function's {@link Address} and serves as integral part of an
+ * individual function's identity.
+ *
+ * @see Address
+ */
+public final class TypeName implements Serializable {
+
+ private static final long serialVersionUID = 1;
+
+ private final String namespace;
+ private final String type;
+ private final String typenameString;
+ private final ByteString typenameByteString;
+
+ public static TypeName typeNameOf(String namespace, String name) {
+ Objects.requireNonNull(namespace);
+ Objects.requireNonNull(name);
+ if (namespace.endsWith("/")) {
+ namespace = namespace.substring(0, namespace.length() - 1);
+ }
+ if (namespace.isEmpty()) {
+ throw new IllegalArgumentException("namespace can not be empty.");
+ }
+ if (name.isEmpty()) {
+ throw new IllegalArgumentException("name can not be empty.");
+ }
+ return new TypeName(namespace, name);
+ }
+
+ public static TypeName typeNameFromString(String typeNameString) {
+ Objects.requireNonNull(typeNameString);
+ final int pos = typeNameString.lastIndexOf("/");
+ if (pos <= 0 || pos == typeNameString.length() - 1) {
+ throw new IllegalArgumentException(
+ typeNameString + " does not conform to the <namespace>/<name> format");
+ }
+ String namespace = typeNameString.substring(0, pos);
+ String name = typeNameString.substring(pos + 1);
+ return typeNameOf(namespace, name);
+ }
+
+ /**
+ * Creates a {@link TypeName}.
+ *
+ * @param namespace the function type's namepsace.
+ * @param type the function type's name.
+ */
+ private TypeName(String namespace, String type) {
+ this.namespace = Objects.requireNonNull(namespace);
+ this.type = Objects.requireNonNull(type);
+ String typenameString = canonicalTypeNameString(namespace, type);
+ this.typenameString = typenameString;
+ this.typenameByteString = ByteString.copyFromUtf8(typenameString);
+ }
+
+ /**
+ * Returns the namespace of the function type.
+ *
+ * @return the namespace of the function type.
+ */
+ public String namespace() {
+ return namespace;
+ }
+
+ /**
+ * Returns the name of the function type.
+ *
+ * @return the name of the function type.
+ */
+ public String name() {
+ return type;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TypeName functionType = (TypeName) o;
+ return namespace.equals(functionType.namespace) && type.equals(functionType.type);
+ }
+
+ @Override
+ public int hashCode() {
+ int hash = 0;
+ hash = 37 * hash + namespace.hashCode();
+ hash = 37 * hash + type.hashCode();
+ return hash;
+ }
+
+ @Override
+ public String toString() {
+ return "TypeName(" + namespace + ", " + type + ")";
+ }
+
+ public String asTypeNameString() {
+ return typenameString;
+ }
+
+ ByteString typeNameByteString() {
+ return typenameByteString;
+ }
+
+ private static String canonicalTypeNameString(String namespace, String type) {
+ return namespace + '/' + type;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
new file mode 100644
index 0000000..d62d2b7
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+
+public final class ValueSpec<T> {
+
+ public static Untyped named(String name) {
+ Objects.requireNonNull(name);
+ return new Untyped(name);
+ }
+
+ private final String name;
+ private final Expiration expiration;
+ private final Type<T> type;
+ private final ByteString nameByteString;
+
+ private ValueSpec(Untyped untyped, Type<T> type) {
+ Objects.requireNonNull(untyped);
+ Objects.requireNonNull(type);
+ this.name = untyped.stateName;
+ this.expiration = untyped.expiration;
+ this.type = Objects.requireNonNull(type);
+ this.nameByteString = ByteString.copyFromUtf8(untyped.stateName);
+ }
+
+ public String name() {
+ return name;
+ }
+
+ public Expiration expiration() {
+ return expiration;
+ }
+
+ public TypeName typeName() {
+ return type.typeName();
+ }
+
+ public Type<T> type() {
+ return type;
+ }
+
+ ByteString nameByteString() {
+ return nameByteString;
+ }
+
+ public static final class Untyped {
+ private final String stateName;
+ private Expiration expiration = Expiration.none();
+
+ public Untyped(String name) {
+ this.stateName = Objects.requireNonNull(name);
+ }
+
+ public Untyped thatExpireAfterWrite(Duration duration) {
+ this.expiration = Expiration.expireAfterWriting(duration);
+ return this;
+ }
+
+ public Untyped thatExpiresAfterReadOrWrite(Duration duration) {
+ this.expiration = Expiration.expireAfterReadingOrWriting(duration);
+ return this;
+ }
+
+ public ValueSpec<Integer> withIntType() {
+ return withCustomType(Types.integerType());
+ }
+
+ public ValueSpec<Long> withLongType() {
+ return withCustomType(Types.longType());
+ }
+
+ public ValueSpec<Float> withFloatType() {
+ return withCustomType(Types.floatType());
+ }
+
+ public ValueSpec<Double> withDoubleType() {
+ return withCustomType(Types.doubleType());
+ }
+
+ public ValueSpec<String> withUtf8String() {
+ return withCustomType(Types.stringType());
+ }
+
+ public ValueSpec<Boolean> withBooleanType() {
+ return new ValueSpec<>(this, Types.booleanType());
+ }
+
+ public <T> ValueSpec<T> withCustomType(Type<T> type) {
+ Objects.requireNonNull(type);
+ return new ValueSpec<>(this, type);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
new file mode 100644
index 0000000..a544b71
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Methods, constructors or classes annotated with this annotation, are used for by the SDK
+ * internally.
+ */
+@Documented
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.TYPE})
+public @interface Internal {}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
new file mode 100644
index 0000000..49e9fcc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.getTypedValue;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.protoAddressFromSdk;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+
+/**
+ * A thread safe implementation of a {@linkplain Context}.
+ *
+ * <p>This context's life cycle is tied to a single batch request. It is constructed when a
+ * {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction} message arrives, and it
+ * carries enough context to compute an {@linkplain
+ * org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse}. Access to the
+ * send/sendAfter/sendEgress methods are synchronized with a @responseBuilder's lock, to prevent
+ * concurrent modification. When the last invocation of the batch completes successfully, a {@link
+ * #finalBuilder()} will be called. After that point no further operations are allowed.
+ */
+final class ConcurrentContext implements Context {
+ private final org.apache.flink.statefun.sdk.java.Address self;
+ private final FromFunction.InvocationResponse.Builder responseBuilder;
+ private final ConcurrentAddressScopedStorage storage;
+ private boolean noFurtherModificationsAllowed;
+
+ private Address caller;
+
+ public ConcurrentContext(
+ org.apache.flink.statefun.sdk.java.Address self,
+ FromFunction.InvocationResponse.Builder responseBuilder,
+ ConcurrentAddressScopedStorage storage) {
+ this.self = Objects.requireNonNull(self);
+ this.responseBuilder = Objects.requireNonNull(responseBuilder);
+ this.storage = Objects.requireNonNull(storage);
+ }
+
+ @Override
+ public org.apache.flink.statefun.sdk.java.Address self() {
+ return self;
+ }
+
+ void setCaller(Address address) {
+ this.caller = address;
+ }
+
+ FromFunction.InvocationResponse.Builder finalBuilder() {
+ synchronized (responseBuilder) {
+ noFurtherModificationsAllowed = true;
+ return responseBuilder;
+ }
+ }
+
+ @Override
+ public Optional<Address> caller() {
+ return Optional.ofNullable(caller);
+ }
+
+ @Override
+ public void send(Message message) {
+ Objects.requireNonNull(message);
+
+ FromFunction.Invocation outInvocation =
+ FromFunction.Invocation.newBuilder()
+ .setArgument(getTypedValue(message))
+ .setTarget(protoAddressFromSdk(message.targetAddress()))
+ .build();
+
+ synchronized (responseBuilder) {
+ checkNotDone();
+ responseBuilder.addOutgoingMessages(outInvocation);
+ }
+ }
+
+ @Override
+ public void sendAfter(Duration duration, Message message) {
+ Objects.requireNonNull(duration);
+ Objects.requireNonNull(message);
+
+ FromFunction.DelayedInvocation outInvocation =
+ FromFunction.DelayedInvocation.newBuilder()
+ .setArgument(getTypedValue(message))
+ .setTarget(protoAddressFromSdk(message.targetAddress()))
+ .setDelayInMs(duration.toMillis())
+ .build();
+
+ synchronized (responseBuilder) {
+ checkNotDone();
+ responseBuilder.addDelayedInvocations(outInvocation);
+ }
+ }
+
+ @Override
+ public void send(EgressMessage message) {
+ Objects.requireNonNull(message);
+
+ TypeName target = message.targetEgressId();
+
+ FromFunction.EgressMessage outInvocation =
+ FromFunction.EgressMessage.newBuilder()
+ .setArgument(getTypedValue(message))
+ .setEgressNamespace(target.namespace())
+ .setEgressType(target.name())
+ .build();
+
+ synchronized (responseBuilder) {
+ checkNotDone();
+ responseBuilder.addOutgoingEgresses(outInvocation);
+ }
+ }
+
+ @Override
+ public AddressScopedStorage storage() {
+ return storage;
+ }
+
+ private void checkNotDone() {
+ if (noFurtherModificationsAllowed) {
+ throw new IllegalStateException("Function has already completed its execution.");
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
new file mode 100644
index 0000000..7397525
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.sdkAddressFromProto;
+
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.storage.StateValueContexts;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+/**
+ * A threadsafe {@linkplain RequestReplyHandler}. This handler lifecycle is bound to the entire
+ * program, and can be safely and concurrently used to handle {@linkplain ToFunction} requests.
+ */
+@Internal
+public final class ConcurrentRequestReplyHandler implements RequestReplyHandler {
+ private final Map<TypeName, StatefulFunctionSpec> functionSpecs;
+
+ public ConcurrentRequestReplyHandler(Map<TypeName, StatefulFunctionSpec> functionSpecs) {
+ this.functionSpecs = Objects.requireNonNull(functionSpecs);
+ }
+
+ @Override
+ public CompletableFuture<Slice> handle(Slice requestBytes) {
+ try {
+ ByteString in = SliceProtobufUtil.asByteString(requestBytes);
+ ToFunction request = ToFunction.parseFrom(in);
+ CompletableFuture<FromFunction> response = handleInternally(request);
+ return response.thenApply(
+ res -> {
+ ByteString out = res.toByteString();
+ return SliceProtobufUtil.asSlice(out);
+ });
+ } catch (Throwable throwable) {
+ return MoreFutures.exceptional(throwable);
+ }
+ }
+
+ CompletableFuture<FromFunction> handleInternally(ToFunction request) {
+ if (!request.hasInvocation()) {
+ return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
+ }
+ ToFunction.InvocationBatchRequest batchRequest = request.getInvocation();
+ Address self = sdkAddressFromProto(batchRequest.getTarget());
+ StatefulFunctionSpec targetSpec = functionSpecs.get(self.type());
+ if (targetSpec == null) {
+ throw new IllegalStateException("Unknown target type " + self);
+ }
+ Supplier<? extends StatefulFunction> supplier = targetSpec.supplier();
+ if (supplier == null) {
+ throw new NullPointerException("missing function supplier for " + self);
+ }
+ StatefulFunction function = supplier.get();
+ if (function == null) {
+ throw new NullPointerException("supplier for " + self + " supplied NULL function.");
+ }
+ StateValueContexts.ResolutionResult stateResolution =
+ StateValueContexts.resolve(targetSpec.knownValues(), batchRequest.getStateList());
+ if (stateResolution.hasMissingValues()) {
+ // not enough information to compute this batch.
+ FromFunction res = buildIncompleteInvocationResponse(stateResolution.missingValues());
+ return CompletableFuture.completedFuture(res);
+ }
+ final ConcurrentAddressScopedStorage storage =
+ new ConcurrentAddressScopedStorage(stateResolution.resolved());
+ return executeBatch(batchRequest, self, storage, function);
+ }
+
+ private CompletableFuture<FromFunction> executeBatch(
+ ToFunction.InvocationBatchRequest inputBatch,
+ Address self,
+ ConcurrentAddressScopedStorage storage,
+ StatefulFunction function) {
+
+ FromFunction.InvocationResponse.Builder responseBuilder =
+ FromFunction.InvocationResponse.newBuilder();
+
+ ConcurrentContext context = new ConcurrentContext(self, responseBuilder, storage);
+
+ CompletableFuture<Void> allDone =
+ applySequentially(
+ inputBatch.getInvocationsList(), invocation -> apply(function, context, invocation));
+
+ return allDone.thenApply(unused -> finalizeResponse(storage, context.finalBuilder()));
+ }
+
+ private static FromFunction buildIncompleteInvocationResponse(List<ValueSpec<?>> missing) {
+ FromFunction.IncompleteInvocationContext.Builder result =
+ FromFunction.IncompleteInvocationContext.newBuilder();
+
+ for (ValueSpec<?> v : missing) {
+ result.addMissingValues(ProtoUtils.protoFromValueSpec(v));
+ }
+
+ return FromFunction.newBuilder().setIncompleteInvocationContext(result).build();
+ }
+
+ private static CompletableFuture<Void> apply(
+ StatefulFunction function, ConcurrentContext context, ToFunction.Invocation invocation)
+ throws Throwable {
+ TypedValue argument = invocation.getArgument();
+ MessageWrapper wrapper = new MessageWrapper(context.self(), argument);
+ context.setCaller(sdkAddressFromProto(invocation.getCaller()));
+ CompletableFuture<Void> future = function.apply(context, wrapper);
+ if (future == null) {
+ throw new IllegalStateException(
+ "User function " + context.self() + " has returned a NULL future.");
+ }
+ return future;
+ }
+
+ private static FromFunction finalizeResponse(
+ ConcurrentAddressScopedStorage storage, FromFunction.InvocationResponse.Builder builder) {
+ storage.addMutations(builder::addStateMutations);
+ return FromFunction.newBuilder().setInvocationResult(builder).build();
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
new file mode 100644
index 0000000..bd3a9e1
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+final class MoreFutures {
+
+ @FunctionalInterface
+ public interface Fn<I, O> {
+ O apply(I input) throws Throwable;
+ }
+
+ /**
+ * Apply @fn for each element of @elements sequentially. Subsequent element is handed to fn only
+ * after the previous future has completed.
+ */
+ public static <T> CompletableFuture<Void> applySequentially(
+ Iterable<T> elements, Fn<T, CompletableFuture<Void>> fn) {
+ Objects.requireNonNull(elements);
+ Objects.requireNonNull(fn);
+ return applySequentially(elements.iterator(), fn);
+ }
+
+ private static <T> CompletableFuture<Void> applySequentially(
+ Iterator<T> iterator, Fn<T, CompletableFuture<Void>> fn) {
+ try {
+ while (iterator.hasNext()) {
+ T next = iterator.next();
+ CompletableFuture<Void> future = fn.apply(next);
+ if (!future.isDone()) {
+ return future.thenCompose(ignored -> applySequentially(iterator, fn));
+ }
+ if (future.isCompletedExceptionally()) {
+ return future;
+ }
+ }
+ return CompletableFuture.completedFuture(null);
+ } catch (Throwable t) {
+ return exceptional(t);
+ }
+ }
+
+ static <T> CompletableFuture<T> exceptional(Throwable cause) {
+ CompletableFuture<T> e = new CompletableFuture<>();
+ e.completeExceptionally(cause);
+ return e;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
new file mode 100644
index 0000000..cbeb5c3
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.ExpireMode;
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.newBuilder;
+
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.Expiration;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+final class ProtoUtils {
+ private ProtoUtils() {}
+
+ static Address protoAddressFromSdk(org.apache.flink.statefun.sdk.java.Address address) {
+ return Address.newBuilder()
+ .setNamespace(address.type().namespace())
+ .setType(address.type().name())
+ .setId(address.id())
+ .build();
+ }
+
+ static org.apache.flink.statefun.sdk.java.Address sdkAddressFromProto(Address address) {
+ if (address == null
+ || (address.getNamespace().isEmpty()
+ && address.getType().isEmpty()
+ && address.getId().isEmpty())) {
+ return null;
+ }
+ return new org.apache.flink.statefun.sdk.java.Address(
+ TypeName.typeNameOf(address.getNamespace(), address.getType()), address.getId());
+ }
+
+ static PersistedValueSpec.Builder protoFromValueSpec(ValueSpec<?> valueSpec) {
+ PersistedValueSpec.Builder specBuilder =
+ PersistedValueSpec.newBuilder()
+ .setStateNameBytes(ApiExtension.stateNameByteString(valueSpec))
+ .setTypeTypenameBytes(ApiExtension.typeNameByteString(valueSpec.typeName()));
+
+ if (valueSpec.expiration().mode() == Expiration.Mode.NONE) {
+ return specBuilder;
+ }
+
+ ExpireMode mode =
+ valueSpec.expiration().mode() == Expiration.Mode.AFTER_READ_OR_WRITE
+ ? ExpireMode.AFTER_INVOKE
+ : ExpireMode.AFTER_WRITE;
+ long value = valueSpec.expiration().duration().toMillis();
+
+ specBuilder.setExpirationSpec(newBuilder().setExpireAfterMillis(value).setMode(mode));
+ return specBuilder;
+ }
+
+ static TypedValue getTypedValue(Message message) {
+ if (message instanceof MessageWrapper) {
+ return ((MessageWrapper) message).typedValue();
+ }
+ return TypedValue.newBuilder()
+ .setTypenameBytes(ApiExtension.typeNameByteString(message.valueTypeName()))
+ .setValue(SliceProtobufUtil.asByteString(message.rawValue()))
+ .build();
+ }
+
+ static TypedValue getTypedValue(EgressMessage message) {
+ if (message instanceof EgressMessageWrapper) {
+ return ((EgressMessageWrapper) message).typedValue();
+ }
+ return TypedValue.newBuilder()
+ .setTypenameBytes(ApiExtension.typeNameByteString(message.egressMessageValueType()))
+ .setValue(SliceProtobufUtil.asByteString(message.egressMessageValueBytes()))
+ .build();
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
new file mode 100644
index 0000000..ed7783c
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface RequestReplyHandler {
+
+ /**
+ * Handles a {@code Stateful Functions} invocation.
+ *
+ * @param input a {@linkplain Slice} as received from the {@code StateFun} server.
+ * @return a serialized representation of the side-effects to preform by the {@code StateFun}
+ * server, as the result of this invocation.
+ */
+ CompletableFuture<Slice> handle(Slice input);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
new file mode 100644
index 0000000..c55b582
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KafkaEgressMessage {
+
+ public static Builder forEgress(TypeName targetEgressId) {
+ Objects.requireNonNull(targetEgressId);
+ return new Builder(targetEgressId);
+ }
+
+ public static final class Builder {
+ private static final TypeName KAFKA_PRODUCER_RECORD_TYPENAME =
+ TypeName.typeNameOf(
+ "type.googleapis.com", KafkaProducerRecord.getDescriptor().getFullName());
+
+ private final TypeName targetEgressId;
+ private ByteString targetTopic;
+ private ByteString keyBytes;
+ private ByteString value;
+
+ private Builder(TypeName targetEgressId) {
+ this.targetEgressId = targetEgressId;
+ }
+
+ public Builder withTopic(String topic) {
+ this.targetTopic = ByteString.copyFromUtf8(topic);
+ return this;
+ }
+
+ public Builder withUtf8Key(String key) {
+ Objects.requireNonNull(key);
+ this.keyBytes = ByteString.copyFromUtf8(key);
+ return this;
+ }
+
+ public Builder withKey(byte[] key) {
+ Objects.requireNonNull(key);
+ this.keyBytes = ByteString.copyFrom(key);
+ return this;
+ }
+
+ public Builder withKey(Slice slice) {
+ Objects.requireNonNull(slice);
+ this.keyBytes = SliceProtobufUtil.asByteString(slice);
+ return this;
+ }
+
+ public <T> Builder withKey(Type<T> type, T value) {
+ TypeSerializer<T> serializer = type.typeSerializer();
+ return withKey(serializer.serialize(value));
+ }
+
+ public Builder withUtf8Value(String value) {
+ Objects.requireNonNull(value);
+ this.value = ByteString.copyFromUtf8(value);
+ return this;
+ }
+
+ public Builder withValue(Slice slice) {
+ Objects.requireNonNull(value);
+ this.value = SliceProtobufUtil.asByteString(slice);
+ return this;
+ }
+
+ public <T> Builder withValue(Type<T> type, T value) {
+ TypeSerializer<T> serializer = type.typeSerializer();
+ return withValue(serializer.serialize(value));
+ }
+
+ public Builder withValue(byte[] value) {
+ Objects.requireNonNull(value);
+ this.value = ByteString.copyFrom(value);
+ return this;
+ }
+
+ public EgressMessage build() {
+ if (targetTopic == null) {
+ throw new IllegalStateException("A Kafka record requires a target topic.");
+ }
+ if (value == null) {
+ throw new IllegalStateException("A Kafka record requires value bytes");
+ }
+ KafkaProducerRecord.Builder builder =
+ KafkaProducerRecord.newBuilder().setTopicBytes(targetTopic).setValueBytes(value);
+ if (keyBytes != null) {
+ builder.setKeyBytes(keyBytes);
+ }
+ KafkaProducerRecord record = builder.build();
+ TypedValue typedValue =
+ TypedValue.newBuilder()
+ .setTypenameBytes(ApiExtension.typeNameByteString(KAFKA_PRODUCER_RECORD_TYPENAME))
+ .setValue(record.toByteString())
+ .build();
+
+ return new EgressMessageWrapper(targetEgressId, typedValue);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
new file mode 100644
index 0000000..000bb59
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KinesisEgressRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KinesisEgressMessage {
+
+ public static Builder forEgress(TypeName targetEgressId) {
+ Objects.requireNonNull(targetEgressId);
+ return new Builder(targetEgressId);
+ }
+
+ public static final class Builder {
+ private static final TypeName KINESIS_PRODUCER_RECORD_TYPENAME =
+ TypeName.typeNameOf(
+ "type.googleapis.com", KinesisEgressRecord.getDescriptor().getFullName());
+
+ private final TypeName targetEgressId;
+ private ByteString targetStreamBytes;
+ private ByteString partitionKeyBytes;
+ private ByteString valueBytes;
+ private ByteString explicitHashKey;
+
+ private Builder(TypeName targetEgressId) {
+ this.targetEgressId = targetEgressId;
+ }
+
+ public Builder withStream(String stream) {
+ Objects.requireNonNull(stream);
+ this.targetStreamBytes = ByteString.copyFromUtf8(stream);
+ return this;
+ }
+
+ public Builder withStream(Slice stream) {
+ this.targetStreamBytes = SliceProtobufUtil.asByteString(stream);
+ return this;
+ }
+
+ public Builder withUtf8PartitionKey(String key) {
+ Objects.requireNonNull(key);
+ this.partitionKeyBytes = ByteString.copyFromUtf8(key);
+ return this;
+ }
+
+ public Builder withPartitionKey(byte[] key) {
+ Objects.requireNonNull(key);
+ this.partitionKeyBytes = ByteString.copyFrom(key);
+ return this;
+ }
+
+ public Builder withPartitionKey(Slice key) {
+ Objects.requireNonNull(key);
+ this.partitionKeyBytes = SliceProtobufUtil.asByteString(key);
+ return this;
+ }
+
+ public Builder withUtf8Value(String value) {
+ Objects.requireNonNull(value);
+ this.valueBytes = ByteString.copyFromUtf8(value);
+ return this;
+ }
+
+ public Builder withValue(byte[] value) {
+ Objects.requireNonNull(value);
+ this.valueBytes = ByteString.copyFrom(value);
+ return this;
+ }
+
+ public Builder withValue(Slice value) {
+ Objects.requireNonNull(value);
+ this.valueBytes = SliceProtobufUtil.asByteString(value);
+ return this;
+ }
+
+ public <T> Builder withValue(Type<T> type, T value) {
+ TypeSerializer<T> serializer = type.typeSerializer();
+ return withValue(serializer.serialize(value));
+ }
+
+ public Builder withUtf8ExplicitHashKey(String value) {
+ Objects.requireNonNull(value);
+ this.explicitHashKey = ByteString.copyFromUtf8(value);
+ return this;
+ }
+
+ public Builder withUtf8ExplicitHashKey(Slice utf8Slice) {
+ Objects.requireNonNull(utf8Slice);
+ this.explicitHashKey = SliceProtobufUtil.asByteString(utf8Slice);
+ return this;
+ }
+
+ public EgressMessage build() {
+ KinesisEgressRecord.Builder builder = KinesisEgressRecord.newBuilder();
+ if (targetStreamBytes == null) {
+ throw new IllegalStateException("Missing destination Kinesis stream");
+ }
+ builder.setStreamBytes(targetStreamBytes);
+ if (partitionKeyBytes == null) {
+ throw new IllegalStateException("Missing partition key");
+ }
+ builder.setPartitionKeyBytes(partitionKeyBytes);
+ if (valueBytes == null) {
+ throw new IllegalStateException("Missing value");
+ }
+ builder.setValueBytes(valueBytes);
+ if (explicitHashKey != null) {
+ builder.setExplicitHashKeyBytes(explicitHashKey);
+ }
+
+ TypedValue typedValue =
+ TypedValue.newBuilder()
+ .setTypenameBytes(ApiExtension.typeNameByteString(KINESIS_PRODUCER_RECORD_TYPENAME))
+ .setValue(builder.build().toByteString())
+ .build();
+
+ return new EgressMessageWrapper(targetEgressId, typedValue);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
new file mode 100644
index 0000000..0b8be7b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface EgressMessage {
+ TypeName targetEgressId();
+
+ TypeName egressMessageValueType();
+
+ Slice egressMessageValueBytes();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
new file mode 100644
index 0000000..9c849a2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class EgressMessageWrapper implements EgressMessage {
+ private final TypedValue typedValue;
+ private final TypeName targetEgressId;
+
+ public EgressMessageWrapper(TypeName targetEgressId, TypedValue actualMessage) {
+ this.targetEgressId = Objects.requireNonNull(targetEgressId);
+ this.typedValue = Objects.requireNonNull(actualMessage);
+ }
+
+ @Override
+ public TypeName targetEgressId() {
+ return targetEgressId;
+ }
+
+ @Override
+ public TypeName egressMessageValueType() {
+ return TypeName.typeNameFromString(typedValue.getTypename());
+ }
+
+ @Override
+ public Slice egressMessageValueBytes() {
+ return SliceProtobufUtil.asSlice(typedValue.getValue());
+ }
+
+ public TypedValue typedValue() {
+ return typedValue;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
new file mode 100644
index 0000000..e2e3467
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+public interface Message {
+ Address targetAddress();
+
+ boolean isLong();
+
+ long asLong();
+
+ boolean isUtf8String();
+
+ String asUtf8String();
+
+ boolean isInt();
+
+ int asInt();
+
+ boolean isBoolean();
+
+ boolean asBoolean();
+
+ boolean isFloat();
+
+ float asFloat();
+
+ boolean isDouble();
+
+ double asDouble();
+
+ <T> boolean is(Type<T> type);
+
+ <T> T as(Type<T> type);
+
+ TypeName valueTypeName();
+
+ Slice rawValue();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
new file mode 100644
index 0000000..cd9f738
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class MessageBuilder {
+ private final TypedValue.Builder builder;
+ private Address targetAddress;
+
+ private MessageBuilder(TypeName functionType, String id) {
+ this(functionType, id, TypedValue.newBuilder());
+ }
+
+ private MessageBuilder(TypeName functionType, String id, TypedValue.Builder builder) {
+ this.targetAddress = new Address(functionType, id);
+ this.builder = Objects.requireNonNull(builder);
+ }
+
+ public static MessageBuilder forAddress(TypeName functionType, String id) {
+ return new MessageBuilder(functionType, id);
+ }
+
+ public static MessageBuilder forAddress(Address address) {
+ Objects.requireNonNull(address);
+ return new MessageBuilder(address.type(), address.id());
+ }
+
+ public static MessageBuilder fromMessage(Message message) {
+ Address targetAddress = message.targetAddress();
+ TypedValue.Builder builder = typedValueBuilder(message);
+ return new MessageBuilder(targetAddress.type(), targetAddress.id(), builder);
+ }
+
+ public MessageBuilder withValue(long value) {
+ return withCustomType(Types.longType(), value);
+ }
+
+ public MessageBuilder withValue(int value) {
+ return withCustomType(Types.integerType(), value);
+ }
+
+ public MessageBuilder withValue(boolean value) {
+ return withCustomType(Types.booleanType(), value);
+ }
+
+ public MessageBuilder withValue(String value) {
+ return withCustomType(Types.stringType(), value);
+ }
+
+ public MessageBuilder withValue(float value) {
+ return withCustomType(Types.floatType(), value);
+ }
+
+ public MessageBuilder withValue(double value) {
+ return withCustomType(Types.doubleType(), value);
+ }
+
+ public MessageBuilder withTargetAddress(Address targetAddress) {
+ this.targetAddress = Objects.requireNonNull(targetAddress);
+ return this;
+ }
+
+ public MessageBuilder withTargetAddress(TypeName typeName, String id) {
+ return withTargetAddress(new Address(typeName, id));
+ }
+
+ public <T> MessageBuilder withCustomType(Type<T> customType, T element) {
+ Objects.requireNonNull(customType);
+ Objects.requireNonNull(element);
+ TypeSerializer<T> typeSerializer = customType.typeSerializer();
+ builder.setTypenameBytes(ApiExtension.typeNameByteString(customType.typeName()));
+ Slice serialized = typeSerializer.serialize(element);
+ ByteString serializedByteString = SliceProtobufUtil.asByteString(serialized);
+ builder.setValue(serializedByteString);
+ return this;
+ }
+
+ public Message build() {
+ return new MessageWrapper(targetAddress, builder.build());
+ }
+
+ private static TypedValue.Builder typedValueBuilder(Message message) {
+ ByteString typenameBytes = ApiExtension.typeNameByteString(message.valueTypeName());
+ ByteString valueBytes = SliceProtobufUtil.asByteString(message.rawValue());
+ return TypedValue.newBuilder().setTypenameBytes(typenameBytes).setValue(valueBytes);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
new file mode 100644
index 0000000..74c5955
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class MessageWrapper implements Message {
+ private final TypedValue typedValue;
+ private final Address targetAddress;
+
+ public MessageWrapper(Address targetAddress, TypedValue typedValue) {
+ this.targetAddress = Objects.requireNonNull(targetAddress);
+ this.typedValue = Objects.requireNonNull(typedValue);
+ }
+
+ @Override
+ public Address targetAddress() {
+ return targetAddress;
+ }
+
+ @Override
+ public boolean isLong() {
+ return is(Types.longType());
+ }
+
+ @Override
+ public long asLong() {
+ return as(Types.longType());
+ }
+
+ @Override
+ public boolean isUtf8String() {
+ return is(Types.stringType());
+ }
+
+ @Override
+ public String asUtf8String() {
+ return as(Types.stringType());
+ }
+
+ @Override
+ public boolean isInt() {
+ return is(Types.integerType());
+ }
+
+ @Override
+ public int asInt() {
+ return as(Types.integerType());
+ }
+
+ @Override
+ public boolean isBoolean() {
+ return is(Types.booleanType());
+ }
+
+ @Override
+ public boolean asBoolean() {
+ return as(Types.booleanType());
+ }
+
+ @Override
+ public boolean isFloat() {
+ return is(Types.floatType());
+ }
+
+ @Override
+ public float asFloat() {
+ return as(Types.floatType());
+ }
+
+ @Override
+ public boolean isDouble() {
+ return is(Types.doubleType());
+ }
+
+ @Override
+ public double asDouble() {
+ return as(Types.doubleType());
+ }
+
+ @Override
+ public <T> boolean is(Type<T> type) {
+ String thisTypeNameString = typedValue.getTypename();
+ String thatTypeNameString = type.typeName().asTypeNameString();
+ return thisTypeNameString.equals(thatTypeNameString);
+ }
+
+ @Override
+ public <T> T as(Type<T> type) {
+ TypeSerializer<T> typeSerializer = type.typeSerializer();
+ Slice input = SliceProtobufUtil.asSlice(typedValue.getValue());
+ return typeSerializer.deserialize(input);
+ }
+
+ @Override
+ public TypeName valueTypeName() {
+ return TypeName.typeNameFromString(typedValue.getTypename());
+ }
+
+ @Override
+ public Slice rawValue() {
+ return SliceProtobufUtil.asSlice(typedValue.getValue());
+ }
+
+ public TypedValue typedValue() {
+ return typedValue;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
new file mode 100644
index 0000000..c28aa58
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+final class ByteStringSlice implements Slice {
+ private final ByteString byteString;
+
+ public ByteStringSlice(ByteString bytes) {
+ this.byteString = Objects.requireNonNull(bytes);
+ }
+
+ public ByteString byteString() {
+ return byteString;
+ }
+
+ @Override
+ public ByteBuffer asReadOnlyByteBuffer() {
+ return byteString.asReadOnlyByteBuffer();
+ }
+
+ @Override
+ public int readableBytes() {
+ return byteString.size();
+ }
+
+ @Override
+ public void copyTo(byte[] target) {
+ copyTo(target, 0);
+ }
+
+ @Override
+ public void copyTo(byte[] target, int targetOffset) {
+ byteString.copyTo(target, targetOffset);
+ }
+
+ @Override
+ public void copyTo(OutputStream outputStream) {
+ try {
+ byteString.writeTo(outputStream);
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ @Override
+ public byte byteAt(int position) {
+ return byteString.byteAt(position);
+ }
+
+ @Override
+ public void copyTo(ByteBuffer buffer) {
+ byteString.copyTo(buffer);
+ }
+
+ @Override
+ public byte[] toByteArray() {
+ return byteString.toByteArray();
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
new file mode 100644
index 0000000..8c6187e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Slice {
+
+ int readableBytes();
+
+ void copyTo(ByteBuffer target);
+
+ void copyTo(byte[] target);
+
+ void copyTo(byte[] target, int targetOffset);
+
+ void copyTo(OutputStream outputStream);
+
+ byte byteAt(int position);
+
+ ByteBuffer asReadOnlyByteBuffer();
+
+ byte[] toByteArray();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
new file mode 100644
index 0000000..2573e4f
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public final class SliceOutput {
+ private byte[] buf;
+ private int position;
+
+ public static SliceOutput sliceOutput(int initialSize) {
+ return new SliceOutput(initialSize);
+ }
+
+ private SliceOutput(int initialSize) {
+ if (initialSize < 0) {
+ throw new IllegalArgumentException("initial size has to be non negative");
+ }
+ this.buf = new byte[initialSize];
+ this.position = 0;
+ }
+
+ public void write(byte b) {
+ ensureCapacity(1);
+ buf[position] = b;
+ position++;
+ }
+
+ public void write(byte[] buffer) {
+ write(buffer, 0, buffer.length);
+ }
+
+ public void write(byte[] buffer, int offset, int len) {
+ Objects.requireNonNull(buffer);
+ if (offset < 0 || offset > buffer.length) {
+ throw new IllegalArgumentException("Offset out of range " + offset);
+ }
+ if (len < 0) {
+ throw new IllegalArgumentException("Negative length " + len);
+ }
+ ensureCapacity(len);
+ System.arraycopy(buffer, offset, buf, position, len);
+ position += len;
+ }
+
+ public void write(ByteBuffer buffer) {
+ int n = buffer.remaining();
+ ensureCapacity(n);
+ buffer.get(buf, position, n);
+ position += n;
+ }
+
+ public void write(Slice slice) {
+ write(slice.asReadOnlyByteBuffer());
+ }
+
+ public void writeFully(InputStream input) {
+ try {
+ int bytesRead;
+ do {
+ ensureCapacity(256);
+ bytesRead = input.read(buf, position, remaining());
+ position += bytesRead;
+ } while (bytesRead != -1);
+ position++; // compensate for the latest -1 addition.
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Slice copyOf() {
+ return Slices.copyOf(buf, 0, position);
+ }
+
+ public Slice view() {
+ return Slices.wrap(buf, 0, position);
+ }
+
+ private int remaining() {
+ return buf.length - position;
+ }
+
+ private void ensureCapacity(final int bytesNeeded) {
+ final int requiredNewLength = position + bytesNeeded;
+ if (requiredNewLength >= buf.length) {
+ this.buf = Arrays.copyOf(buf, 2 * requiredNewLength);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
new file mode 100644
index 0000000..2a87b5b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.Parser;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class SliceProtobufUtil {
+ private SliceProtobufUtil() {}
+
+ public static <T> T parseFrom(Parser<T> parser, Slice slice)
+ throws InvalidProtocolBufferException {
+ if (slice instanceof ByteStringSlice) {
+ ByteString byteString = ((ByteStringSlice) slice).byteString();
+ return parser.parseFrom(byteString);
+ }
+ return parser.parseFrom(slice.asReadOnlyByteBuffer());
+ }
+
+ public static Slice toSlice(Message message) {
+ return Slices.wrap(message.toByteArray());
+ }
+
+ public static ByteString asByteString(Slice slice) {
+ if (slice instanceof ByteStringSlice) {
+ ByteStringSlice byteStringSlice = (ByteStringSlice) slice;
+ return byteStringSlice.byteString();
+ }
+ return MoreByteStrings.wrap(slice.asReadOnlyByteBuffer());
+ }
+
+ public static Slice asSlice(ByteString byteString) {
+ return new ByteStringSlice(byteString);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
new file mode 100644
index 0000000..7f42291
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.MoreByteStrings;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public final class Slices {
+ private Slices() {}
+
+ public static Slice wrap(ByteBuffer buffer) {
+ return wrap(MoreByteStrings.wrap(buffer));
+ }
+
+ public static Slice wrap(byte[] bytes) {
+ return wrap(MoreByteStrings.wrap(bytes));
+ }
+
+ private static Slice wrap(ByteString bytes) {
+ return new ByteStringSlice(bytes);
+ }
+
+ public static Slice wrap(byte[] bytes, int offset, int len) {
+ return wrap(MoreByteStrings.wrap(bytes, offset, len));
+ }
+
+ public static Slice copyOf(byte[] bytes) {
+ return wrap(ByteString.copyFrom(bytes));
+ }
+
+ public static Slice copyOf(byte[] bytes, int offset, int len) {
+ return wrap(ByteString.copyFrom(bytes, offset, len));
+ }
+
+ public static Slice copyOf(InputStream inputStream, int expectedStreamSize) {
+ SliceOutput out = SliceOutput.sliceOutput(expectedStreamSize);
+ out.writeFully(inputStream);
+ return out.view();
+ }
+
+ public static Slice copyFromUtf8(String input) {
+ return wrap(ByteString.copyFromUtf8(input));
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
new file mode 100644
index 0000000..d692c13
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.TypeCharacteristics;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class ConcurrentAddressScopedStorage implements AddressScopedStorage {
+
+ private final List<Cell<?>> cells;
+
+ public ConcurrentAddressScopedStorage(List<StateValueContext<?>> stateValues) {
+ this.cells = createCells(stateValues);
+ }
+
+ @Override
+ public <T> Optional<T> get(ValueSpec<T> valueSpec) {
+ final Cell<T> cell = getCellOrThrow(valueSpec);
+ return cell.get();
+ }
+
+ @Override
+ public <T> void set(ValueSpec<T> valueSpec, T value) {
+ final Cell<T> cell = getCellOrThrow(valueSpec);
+ cell.set(value);
+ }
+
+ @Override
+ public <T> void remove(ValueSpec<T> valueSpec) {
+ final Cell<T> cell = getCellOrThrow(valueSpec);
+ cell.remove();
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Cell<T> getCellOrThrow(ValueSpec<T> runtimeSpec) {
+ // fast path: the user used the same ValueSpec reference to declare the function
+ // and to index into the state.
+ for (Cell<?> cell : cells) {
+ ValueSpec<?> registeredSpec = cell.spec();
+ if (runtimeSpec == registeredSpec) {
+ return (Cell<T>) cell;
+ }
+ }
+ return slowGetCellOrThrow(runtimeSpec);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> Cell<T> slowGetCellOrThrow(ValueSpec<T> valueSpec) {
+ // unlikely slow path: when the users used a different ValueSpec instance in registration
+ // and at runtime.
+ for (Cell<?> cell : cells) {
+ ValueSpec<?> thisSpec = cell.spec();
+ String thisName = thisSpec.name();
+ if (!thisName.equals(valueSpec.name())) {
+ continue;
+ }
+ if (thisSpec.typeName().equals(valueSpec.typeName())) {
+ return (Cell<T>) cell;
+ }
+ throw new IllegalStorageAccessException(
+ valueSpec.name(),
+ "Accessed state with incorrect type; state type was registered as "
+ + thisSpec.typeName()
+ + ", but was accessed as type "
+ + valueSpec.typeName());
+ }
+ throw new IllegalStorageAccessException(
+ valueSpec.name(), "State does not exist; make sure that this state was registered.");
+ }
+
+ public void addMutations(Consumer<PersistedValueMutation> consumer) {
+ for (Cell<?> cell : cells) {
+ cell.toProtocolValueMutation().ifPresent(consumer);
+ }
+ }
+
+ // ===============================================================================
+ // Thread-safe state value cells
+ // ===============================================================================
+
+ private interface Cell<T> {
+ Optional<T> get();
+
+ void set(T value);
+
+ void remove();
+
+ Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation();
+
+ ValueSpec<T> spec();
+ }
+
+ private static <T> Optional<T> tryDeserialize(
+ TypeSerializer<T> serializer, TypedValue typedValue) {
+ if (!typedValue.getHasValue()) {
+ return Optional.empty();
+ }
+ Slice slice = SliceProtobufUtil.asSlice(typedValue.getValue());
+ T value = serializer.deserialize(slice);
+ return Optional.ofNullable(value);
+ }
+
+ private static <T> ByteString serialize(TypeSerializer<T> serializer, T value) {
+ Slice slice = serializer.serialize(value);
+ return SliceProtobufUtil.asByteString(slice);
+ }
+
+ private static final class ImmutableTypeCell<T> implements Cell<T> {
+ private final ReentrantLock lock = new ReentrantLock();
+ private final ValueSpec<T> spec;
+ private final TypedValue typedValue;
+ private final TypeSerializer<T> serializer;
+
+ private CellStatus status = CellStatus.UNMODIFIED;
+ private T cachedObject;
+
+ public ImmutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+ this.spec = spec;
+ this.typedValue = typedValue;
+ this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+ }
+
+ @Override
+ public Optional<T> get() {
+ lock.lock();
+ try {
+ if (status == CellStatus.DELETED) {
+ return Optional.empty();
+ }
+ if (cachedObject != null) {
+ return Optional.of(cachedObject);
+ }
+ Optional<T> result = tryDeserialize(serializer, typedValue);
+ result.ifPresent(object -> this.cachedObject = object);
+ return result;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void set(T value) {
+ if (value == null) {
+ throw new IllegalStorageAccessException(
+ spec.name(), "Can not set state to NULL. Please use remove() instead.");
+ }
+ lock.lock();
+ try {
+ cachedObject = value;
+ status = CellStatus.MODIFIED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void remove() {
+ lock.lock();
+ try {
+ cachedObject = null;
+ status = CellStatus.DELETED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Optional<PersistedValueMutation> toProtocolValueMutation() {
+ final String typeNameString = spec.typeName().asTypeNameString();
+ switch (status) {
+ case MODIFIED:
+ final TypedValue.Builder newValue =
+ TypedValue.newBuilder()
+ .setTypename(typeNameString)
+ .setHasValue(true)
+ .setValue(serialize(serializer, cachedObject));
+
+ return Optional.of(
+ PersistedValueMutation.newBuilder()
+ .setStateName(spec.name())
+ .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+ .setStateValue(newValue)
+ .build());
+ case DELETED:
+ return Optional.of(
+ PersistedValueMutation.newBuilder()
+ .setStateName(spec.name())
+ .setMutationType(PersistedValueMutation.MutationType.DELETE)
+ .build());
+ case UNMODIFIED:
+ return Optional.empty();
+ default:
+ throw new IllegalStateException("Unknown cell status: " + status);
+ }
+ }
+
+ @Override
+ public ValueSpec<T> spec() {
+ return spec;
+ }
+ }
+
+ private static final class MutableTypeCell<T> implements Cell<T> {
+ private final ReentrantLock lock = new ReentrantLock();
+
+ private final TypeSerializer<T> serializer;
+ private final ValueSpec<T> spec;
+ private TypedValue typedValue;
+ private CellStatus status = CellStatus.UNMODIFIED;
+
+ private MutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+ this.spec = spec;
+ this.typedValue = typedValue;
+ this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+ }
+
+ @Override
+ public Optional<T> get() {
+ lock.lock();
+ try {
+ if (status == CellStatus.DELETED) {
+ return Optional.empty();
+ }
+ return tryDeserialize(serializer, typedValue);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void set(T value) {
+ if (value == null) {
+ throw new IllegalStorageAccessException(
+ spec.name(), "Can not set state to NULL. Please use remove() instead.");
+ }
+ lock.lock();
+ try {
+ final TypedValue newTypedValue =
+ this.typedValue
+ .toBuilder()
+ .setHasValue(true)
+ .setValue(serialize(serializer, value))
+ .build();
+ this.typedValue = newTypedValue;
+ this.status = CellStatus.MODIFIED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public void remove() {
+ lock.lock();
+ try {
+ status = CellStatus.DELETED;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ @Override
+ public Optional<PersistedValueMutation> toProtocolValueMutation() {
+ switch (status) {
+ case MODIFIED:
+ return Optional.of(
+ PersistedValueMutation.newBuilder()
+ .setStateName(spec.name())
+ .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+ .setStateValue(typedValue)
+ .build());
+ case DELETED:
+ return Optional.of(
+ PersistedValueMutation.newBuilder()
+ .setStateName(spec.name())
+ .setMutationType(PersistedValueMutation.MutationType.DELETE)
+ .build());
+ case UNMODIFIED:
+ return Optional.empty();
+ default:
+ throw new IllegalStateException("Unknown cell status: " + status);
+ }
+ }
+
+ @Override
+ public ValueSpec<T> spec() {
+ return spec;
+ }
+ }
+
+ private enum CellStatus {
+ UNMODIFIED,
+ MODIFIED,
+ DELETED
+ }
+
+ private static List<Cell<?>> createCells(List<StateValueContext<?>> stateValues) {
+ final List<Cell<?>> cells = new ArrayList<>(stateValues.size());
+
+ for (StateValueContext<?> stateValueContext : stateValues) {
+ final TypedValue typedValue = stateValueContext.protocolValue().getStateValue();
+ final ValueSpec<?> spec = stateValueContext.spec();
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ final Cell<?> cell =
+ spec.type().typeCharacteristics().contains(TypeCharacteristics.IMMUTABLE_VALUES)
+ ? new ImmutableTypeCell(spec, typedValue)
+ : new MutableTypeCell(spec, typedValue);
+
+ cells.add(cell);
+ }
+
+ return cells;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
new file mode 100644
index 0000000..3a84840
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+public final class IllegalStorageAccessException extends RuntimeException {
+
+ private static final long serialVersionUID = 1;
+
+ protected IllegalStorageAccessException(String stateName, String message) {
+ super("Error accessing state " + stateName + "; " + message);
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
new file mode 100644
index 0000000..009c7aa
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+/**
+ * Utility for pairing registered {@link ValueSpec}s with values provided by the protocol's {@link
+ * ToFunction} message.
+ */
+@Internal
+public final class StateValueContexts {
+
+ public static final class StateValueContext<T> {
+ private final ValueSpec<T> spec;
+ private final ToFunction.PersistedValue protocolValue;
+
+ StateValueContext(ValueSpec<T> spec, ToFunction.PersistedValue protocolValue) {
+ this.spec = Objects.requireNonNull(spec);
+ this.protocolValue = Objects.requireNonNull(protocolValue);
+ }
+
+ public ValueSpec<T> spec() {
+ return spec;
+ }
+
+ public ToFunction.PersistedValue protocolValue() {
+ return protocolValue;
+ }
+ }
+
+ public static final class ResolutionResult {
+ private final List<StateValueContext<?>> resolved;
+ private final List<ValueSpec<?>> missingValues;
+
+ private ResolutionResult(
+ List<StateValueContext<?>> resolved, List<ValueSpec<?>> missingValues) {
+ this.resolved = resolved;
+ this.missingValues = missingValues;
+ }
+
+ public boolean hasMissingValues() {
+ return missingValues != null;
+ }
+
+ public List<StateValueContext<?>> resolved() {
+ return resolved;
+ }
+
+ public List<ValueSpec<?>> missingValues() {
+ return missingValues;
+ }
+ }
+
+ /**
+ * Tries to resolve any registered states that are known to us by the {@link ValueSpec} with the
+ * states provided to us by the runtime.
+ */
+ public static ResolutionResult resolve(
+ Map<String, ValueSpec<?>> registeredSpecs,
+ List<ToFunction.PersistedValue> protocolProvidedValues) {
+
+ // holds a set of missing ValueSpec's. a missing ValueSpec is a value spec that was
+ // registered by the user but wasn't sent to the SDK by the runtime.
+ // this can happen upon an initial request.
+ List<ValueSpec<?>> statesWithMissingValue =
+ null; // optimize for normal execution, where states aren't missing.
+
+ // holds the StateValueContext that will be used to serialize and deserialize user state.
+ final List<StateValueContext<?>> resolvedStateValues = new ArrayList<>(registeredSpecs.size());
+
+ for (ValueSpec<?> spec : registeredSpecs.values()) {
+ ToFunction.PersistedValue persistedValue =
+ findPersistedValueByName(protocolProvidedValues, spec.name());
+
+ if (persistedValue != null) {
+ resolvedStateValues.add(new StateValueContext<>(spec, persistedValue));
+ } else {
+ // oh no. the runtime doesn't know (yet) about a state that was registered by the user.
+ // we need to collect these.
+ statesWithMissingValue =
+ (statesWithMissingValue != null)
+ ? statesWithMissingValue
+ : new ArrayList<>(registeredSpecs.size());
+ statesWithMissingValue.add(spec);
+ }
+ }
+
+ if (statesWithMissingValue == null) {
+ return new ResolutionResult(resolvedStateValues, null);
+ }
+ return new ResolutionResult(null, statesWithMissingValue);
+ }
+
+ /**
+ * finds a {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.PersistedValue}
+ * with a given name. The size of the list, in practice is expected to be very small (0-10) items.
+ * just use a plain linear search.
+ */
+ private static ToFunction.PersistedValue findPersistedValueByName(
+ List<ToFunction.PersistedValue> protocolProvidedValues, String name) {
+ for (ToFunction.PersistedValue value : protocolProvidedValues) {
+ if (Objects.equals(value.getStateName(), name)) {
+ return value;
+ }
+ }
+ return null;
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
new file mode 100644
index 0000000..71d5488
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+public final class SimpleType<T> implements Type<T> {
+
+ @FunctionalInterface
+ public interface Fn<I, O> {
+ O apply(I input) throws Throwable;
+ }
+
+ public static <T> Type<T> simpleTypeFrom(
+ TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+ return new SimpleType<>(typeName, serialize, deserialize, Collections.emptySet());
+ }
+
+ public static <T> Type<T> simpleImmutableTypeFrom(
+ TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+ return new SimpleType<>(
+ typeName, serialize, deserialize, EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+ }
+
+ private final TypeName typeName;
+ private final TypeSerializer<T> serializer;
+ private final Set<TypeCharacteristics> typeCharacteristics;
+
+ public SimpleType(
+ TypeName typeName,
+ Fn<T, byte[]> serialize,
+ Fn<byte[], T> deserialize,
+ Set<TypeCharacteristics> typeCharacteristics) {
+ this.typeName = Objects.requireNonNull(typeName);
+ this.serializer = new Serializer<>(serialize, deserialize);
+ this.typeCharacteristics = Collections.unmodifiableSet(EnumSet.copyOf(typeCharacteristics));
+ }
+
+ @Override
+ public TypeName typeName() {
+ return typeName;
+ }
+
+ @Override
+ public TypeSerializer<T> typeSerializer() {
+ return serializer;
+ }
+
+ @Override
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return typeCharacteristics;
+ }
+
+ private static final class Serializer<T> implements TypeSerializer<T> {
+ private final Fn<T, byte[]> serialize;
+ private final Fn<byte[], T> deserialize;
+
+ private Serializer(Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+ this.serialize = Objects.requireNonNull(serialize);
+ this.deserialize = Objects.requireNonNull(deserialize);
+ }
+
+ @Override
+ public Slice serialize(T value) {
+ try {
+ byte[] bytes = serialize.apply(value);
+ return Slices.wrap(bytes);
+ } catch (Throwable throwable) {
+ throw new IllegalStateException(throwable);
+ }
+ }
+
+ @Override
+ public T deserialize(Slice input) {
+ try {
+ byte[] bytes = input.toByteArray();
+ return deserialize.apply(bytes);
+ } catch (Throwable throwable) {
+ throw new IllegalStateException(throwable);
+ }
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
new file mode 100644
index 0000000..7343b5a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+
+public interface Type<T> {
+
+ TypeName typeName();
+
+ TypeSerializer<T> typeSerializer();
+
+ default Set<TypeCharacteristics> typeCharacteristics() {
+ return Collections.emptySet();
+ }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
new file mode 100644
index 0000000..c29bde2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+public enum TypeCharacteristics {
+ IMMUTABLE_VALUES
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
new file mode 100644
index 0000000..c60e802
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface TypeSerializer<T> {
+
+ Slice serialize(T value);
+
+ T deserialize(Slice bytes);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
new file mode 100644
index 0000000..daa4fbf
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.parseFrom;
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.toSlice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.WireFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.DoubleWrapper;
+import org.apache.flink.statefun.sdk.types.generated.FloatWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+
+public final class Types {
+ private Types() {}
+
+ // primitives
+ public static final TypeName BOOLEAN_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/bool");
+ public static final TypeName INTEGER_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/int");
+ public static final TypeName FLOAT_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/float");
+ public static final TypeName LONG_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/long");
+ public static final TypeName DOUBLE_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/double");
+ public static final TypeName STRING_TYPENAME =
+ TypeName.typeNameFromString("io.statefun.types/string");
+
+ // common characteristics
+ private static final Set<TypeCharacteristics> IMMUTABLE_TYPE_CHARS =
+ Collections.unmodifiableSet(EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+
+ public static Type<Long> longType() {
+ return LongType.INSTANCE;
+ }
+
+ public static Type<String> stringType() {
+ return StringType.INSTANCE;
+ }
+
+ public static Type<Integer> integerType() {
+ return IntegerType.INSTANCE;
+ }
+
+ public static Type<Boolean> booleanType() {
+ return BooleanType.INSTANCE;
+ }
+
+ public static Type<Float> floatType() {
+ return FloatType.INSTANCE;
+ }
+
+ public static Type<Double> doubleType() {
+ return DoubleType.INSTANCE;
+ }
+
+ /**
+ * Compute the Protobuf field tag, as specified by the Protobuf wire format. See {@code
+ * WireFormat#makeTag(int, int)}}. NOTE: that, currently, for all StateFun provided wire types the
+ * tags should be 1 byte.
+ *
+ * @param fieldNumber the field number as specified in the message definition.
+ * @param wireType the field type as specified in the message definition.
+ * @return the field tag as a single byte.
+ */
+ private static byte protobufTagAsSingleByte(int fieldNumber, int wireType) {
+ int fieldTag = fieldNumber << 3 | wireType;
+ if (fieldTag < -127 || fieldTag > 127) {
+ throw new IllegalStateException(
+ "Protobuf Wrapper type compatibility is bigger than one byte.");
+ }
+ return (byte) fieldTag;
+ }
+
+ private static final Slice EMPTY_SLICE = Slices.wrap(new byte[] {});
+
+ private static final class LongType implements Type<Long> {
+
+ static final Type<Long> INSTANCE = new LongType();
+
+ private final TypeSerializer<Long> serializer = new LongTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return LONG_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<Long> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class LongTypeSerializer implements TypeSerializer<Long> {
+
+ @Override
+ public Slice serialize(Long element) {
+ return serializeLongWrapperCompatibleLong(element);
+ }
+
+ @Override
+ public Long deserialize(Slice input) {
+ return deserializeLongWrapperCompatibleLong(input);
+ }
+
+ private static final byte WRAPPER_TYPE_FIELD_TAG =
+ protobufTagAsSingleByte(LongWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED64);
+
+ private static Slice serializeLongWrapperCompatibleLong(long n) {
+ if (n == 0) {
+ return EMPTY_SLICE;
+ }
+ byte[] out = new byte[9];
+ out[0] = WRAPPER_TYPE_FIELD_TAG;
+ out[1] = (byte) (n & 0xFF);
+ out[2] = (byte) ((n >> 8) & 0xFF);
+ out[3] = (byte) ((n >> 16) & 0xFF);
+ out[4] = (byte) ((n >> 24) & 0xFF);
+ out[5] = (byte) ((n >> 32) & 0xFF);
+ out[6] = (byte) ((n >> 40) & 0xFF);
+ out[7] = (byte) ((n >> 48) & 0xFF);
+ out[8] = (byte) ((n >> 56) & 0xFF);
+ return Slices.wrap(out);
+ }
+
+ private static long deserializeLongWrapperCompatibleLong(Slice slice) {
+ if (slice.readableBytes() == 0) {
+ return 0;
+ }
+ if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TAG) {
+ throw new IllegalStateException("Not a LongWrapper");
+ }
+ return slice.byteAt(1) & 0xFFL
+ | (slice.byteAt(2) & 0xFFL) << 8
+ | (slice.byteAt(3) & 0xFFL) << 16
+ | (slice.byteAt(4) & 0xFFL) << 24
+ | (slice.byteAt(5) & 0xFFL) << 32
+ | (slice.byteAt(6) & 0xFFL) << 40
+ | (slice.byteAt(7) & 0xFFL) << 48
+ | (slice.byteAt(8) & 0xFFL) << 56;
+ }
+ }
+
+ private static final class StringType implements Type<String> {
+
+ static final Type<String> INSTANCE = new StringType();
+
+ private final TypeSerializer<String> serializer = new StringTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return STRING_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<String> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class StringTypeSerializer implements TypeSerializer<String> {
+
+ private static final byte STRING_WRAPPER_FIELD_TYPE =
+ protobufTagAsSingleByte(
+ StringWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+
+ @Override
+ public Slice serialize(String element) {
+ return serializeStringWrapperCompatibleString(element);
+ }
+
+ @Override
+ public String deserialize(Slice input) {
+ return deserializeStringWrapperCompatibleString(input);
+ }
+
+ private static Slice serializeStringWrapperCompatibleString(String element) {
+ if (element.isEmpty()) {
+ return EMPTY_SLICE;
+ }
+ ByteString utf8 = ByteString.copyFromUtf8(element);
+ byte[] header = new byte[1 + 5 + utf8.size()];
+ int position = 0;
+ // write the field tag
+ header[position++] = STRING_WRAPPER_FIELD_TYPE;
+ // write utf8 bytes.length as a VarInt.
+ int varIntLen = utf8.size();
+ while ((varIntLen & -128) != 0) {
+ header[position++] = ((byte) (varIntLen & 127 | 128));
+ varIntLen >>>= 7;
+ }
+ header[position++] = ((byte) varIntLen);
+ // concat header and the utf8 string bytes
+ ByteString headerBuf = MoreByteStrings.wrap(header, 0, position);
+ ByteString result = MoreByteStrings.concat(headerBuf, utf8);
+ return SliceProtobufUtil.asSlice(result);
+ }
+
+ private static String deserializeStringWrapperCompatibleString(Slice input) {
+ if (input.readableBytes() == 0) {
+ return "";
+ }
+ ByteString buf = SliceProtobufUtil.asByteString(input);
+ int position = 0;
+ // read field tag
+ if (buf.byteAt(position++) != STRING_WRAPPER_FIELD_TYPE) {
+ throw new IllegalStateException("Not a StringWrapper");
+ }
+ // read VarInt32 length
+ int shift = 0;
+ long varIntSize = 0;
+ while (shift < 32) {
+ final byte b = buf.byteAt(position++);
+ varIntSize |= (long) (b & 0x7F) << shift;
+ if ((b & 0x80) == 0) {
+ break;
+ }
+ shift += 7;
+ }
+ // sanity checks
+ if (varIntSize < 0 || varIntSize > Integer.MAX_VALUE) {
+ throw new IllegalStateException("Malformed VarInt");
+ }
+ final int endIndex = position + (int) varIntSize;
+ ByteString utf8Bytes = buf.substring(position, endIndex);
+ return utf8Bytes.toStringUtf8();
+ }
+ }
+
+ private static final class IntegerType implements Type<Integer> {
+
+ static final Type<Integer> INSTANCE = new IntegerType();
+
+ private final TypeSerializer<Integer> serializer = new IntegerTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return INTEGER_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<Integer> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class IntegerTypeSerializer implements TypeSerializer<Integer> {
+ private static final byte WRAPPER_TYPE_FIELD_TYPE =
+ protobufTagAsSingleByte(IntWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED32);
+
+ @Override
+ public Slice serialize(Integer element) {
+ return serializeIntegerWrapperCompatibleInt(element);
+ }
+
+ @Override
+ public Integer deserialize(Slice input) {
+ return deserializeIntegerWrapperCompatibleInt(input);
+ }
+
+ private static Slice serializeIntegerWrapperCompatibleInt(int n) {
+ if (n == 0) {
+ return EMPTY_SLICE;
+ }
+ byte[] out = new byte[5];
+ out[0] = WRAPPER_TYPE_FIELD_TYPE;
+ out[1] = (byte) (n & 0xFF);
+ out[2] = (byte) ((n >> 8) & 0xFF);
+ out[3] = (byte) ((n >> 16) & 0xFF);
+ out[4] = (byte) ((n >> 24) & 0xFF);
+ return Slices.wrap(out);
+ }
+
+ private static int deserializeIntegerWrapperCompatibleInt(Slice slice) {
+ if (slice.readableBytes() == 0) {
+ return 0;
+ }
+ if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TYPE) {
+ throw new IllegalStateException("Not an IntWrapper");
+ }
+ return slice.byteAt(1) & 0xFF
+ | (slice.byteAt(2) & 0xFF) << 8
+ | (slice.byteAt(3) & 0xFF) << 16
+ | (slice.byteAt(4) & 0xFF) << 24;
+ }
+ }
+
+ private static final class BooleanType implements Type<Boolean> {
+
+ static final Type<Boolean> INSTANCE = new BooleanType();
+
+ private final TypeSerializer<Boolean> serializer = new BooleanTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return BOOLEAN_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<Boolean> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class BooleanTypeSerializer implements TypeSerializer<Boolean> {
+ private static final Slice TRUE_SLICE =
+ toSlice(BooleanWrapper.newBuilder().setValue(true).build());
+ private static final Slice FALSE_SLICE =
+ toSlice(BooleanWrapper.newBuilder().setValue(false).build());
+ private static final byte WRAPPER_TYPE_TAG =
+ protobufTagAsSingleByte(BooleanWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_VARINT);
+
+ @Override
+ public Slice serialize(Boolean element) {
+ return element ? TRUE_SLICE : FALSE_SLICE;
+ }
+
+ @Override
+ public Boolean deserialize(Slice input) {
+ if (input.readableBytes() == 0) {
+ return false;
+ }
+ final byte tag = input.byteAt(0);
+ final byte value = input.byteAt(1);
+ if (tag != WRAPPER_TYPE_TAG || value != (byte) 1) {
+ throw new IllegalStateException("Not a BooleanWrapper value.");
+ }
+ return true;
+ }
+ }
+
+ private static final class FloatType implements Type<Float> {
+
+ static final Type<Float> INSTANCE = new FloatType();
+
+ private final TypeSerializer<Float> serializer = new FloatTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return FLOAT_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<Float> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class FloatTypeSerializer implements TypeSerializer<Float> {
+
+ @Override
+ public Slice serialize(Float element) {
+ FloatWrapper wrapper = FloatWrapper.newBuilder().setValue(element).build();
+ return toSlice(wrapper);
+ }
+
+ @Override
+ public Float deserialize(Slice input) {
+ try {
+ FloatWrapper wrapper = parseFrom(FloatWrapper.parser(), input);
+ return wrapper.getValue();
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+
+ private static final class DoubleType implements Type<Double> {
+
+ static final Type<Double> INSTANCE = new DoubleType();
+
+ private final TypeSerializer<Double> serializer = new DoubleTypeSerializer();
+
+ @Override
+ public TypeName typeName() {
+ return DOUBLE_TYPENAME;
+ }
+
+ @Override
+ public TypeSerializer<Double> typeSerializer() {
+ return serializer;
+ }
+
+ public Set<TypeCharacteristics> typeCharacteristics() {
+ return IMMUTABLE_TYPE_CHARS;
+ }
+ }
+
+ private static final class DoubleTypeSerializer implements TypeSerializer<Double> {
+
+ @Override
+ public Slice serialize(Double element) {
+ DoubleWrapper wrapper = DoubleWrapper.newBuilder().setValue(element).build();
+ return toSlice(wrapper);
+ }
+
+ @Override
+ public Double deserialize(Slice input) {
+ try {
+ DoubleWrapper wrapper = parseFrom(DoubleWrapper.parser(), input);
+ return wrapper.getValue();
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
new file mode 100644
index 0000000..4c0a116
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.modifiedValue;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.protoFromValueSpec;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.handler.TestUtils.RequestBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.junit.Test;
+
+public class ConcurrentRequestReplyHandlerTest {
+
+ private static final TypeName GREETER_TYPE = TypeName.typeNameFromString("example/greeter");
+
+ private static final ValueSpec<Integer> SEEN_INT_SPEC =
+ ValueSpec.named("seen").thatExpiresAfterReadOrWrite(Duration.ofDays(1)).withIntType();
+
+ private static final StatefulFunctionSpec GREET_FN_SPEC =
+ StatefulFunctionSpec.builder(GREETER_TYPE)
+ .withValueSpec(SEEN_INT_SPEC)
+ .withSupplier(SimpleGreeter::new)
+ .build();
+
+ private final ConcurrentRequestReplyHandler handlerUnderTest =
+ new ConcurrentRequestReplyHandler(singletonMap(GREETER_TYPE, GREET_FN_SPEC));
+
+ @Test
+ public void simpleInvocationExample() {
+ ToFunction request =
+ new RequestBuilder()
+ .withTarget(GREETER_TYPE, "0")
+ .withState(SEEN_INT_SPEC, 1023)
+ .withInvocation(Types.stringType(), "Hello world")
+ .build();
+
+ FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+ assertThat(response, notNullValue());
+ }
+
+ @Test
+ public void invocationWithoutStateDefinition() {
+ ToFunction request =
+ new RequestBuilder()
+ .withTarget(GREETER_TYPE, "0")
+ .withInvocation(Types.stringType(), "Hello world")
+ .build();
+
+ FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+ assertThat(
+ response.getIncompleteInvocationContext().getMissingValuesList(),
+ hasItem(protoFromValueSpec(SEEN_INT_SPEC)));
+ }
+
+ @Test
+ public void multipleInvocations() {
+ ToFunction request =
+ new RequestBuilder()
+ .withTarget(GREETER_TYPE, "0")
+ .withState(SEEN_INT_SPEC, 0)
+ .withInvocation(Types.stringType(), "a")
+ .withInvocation(Types.stringType(), "b")
+ .withInvocation(Types.stringType(), "c")
+ .build();
+
+ FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+ assertThat(
+ response.getInvocationResult().getStateMutationsList(),
+ hasItem(modifiedValue(SEEN_INT_SPEC, 3)));
+ }
+
+ private static final class SimpleGreeter implements StatefulFunction {
+
+ @Override
+ public CompletableFuture<Void> apply(Context context, Message argument) {
+ int seen = context.storage().get(SEEN_INT_SPEC).orElse(0);
+ System.out.println("Hello there " + argument.asUtf8String() + " state=" + seen);
+
+ context.storage().set(SEEN_INT_SPEC, seen + 1);
+
+ return CompletableFuture.completedFuture(null);
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
new file mode 100644
index 0000000..d861d5a
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.IntStream;
+import org.junit.Test;
+
+public class MoreFuturesTest {
+
+ @Test
+ public void usageExample() {
+ final ConcurrentLinkedDeque<String> results = new ConcurrentLinkedDeque<>();
+
+ CompletableFuture<?> allDone =
+ applySequentially(
+ Arrays.asList("a", "b", "c"),
+ letter -> CompletableFuture.runAsync(() -> results.addLast(letter)));
+
+ allDone.join();
+
+ assertThat(results, contains("a", "b", "c"));
+ }
+
+ @Test
+ public void firstThrows() {
+ CompletableFuture<Void> allDone =
+ applySequentially(Arrays.asList("a", "b", "c"), throwing("a"));
+
+ assertThat(allDone.isCompletedExceptionally(), is(true));
+ }
+
+ @Test
+ public void lastThrows() {
+ CompletableFuture<Void> allDone =
+ applySequentially(Arrays.asList("a", "b", "c"), throwing("c"));
+
+ assertThat(allDone.isCompletedExceptionally(), is(true));
+ }
+
+ @Test
+ public void bigList() {
+ Iterator<String> lotsOfInts =
+ IntStream.range(0, 1_000_000).mapToObj(String::valueOf).iterator();
+ Iterable<String> input = () -> lotsOfInts;
+
+ CompletableFuture<Void> allDone = applySequentially(input, throwing(null));
+
+ allDone.join();
+ }
+
+ private static MoreFutures.Fn<String, CompletableFuture<Void>> throwing(String when) {
+ return letter -> {
+ if (letter.equals(when)) {
+ throw new IllegalStateException("I'm a throwing function");
+ } else {
+ return CompletableFuture.completedFuture(null);
+ // return CompletableFuture.runAsync(
+ // () -> {
+ //// try {
+ //// Thread.sleep(1_00);
+ //// } catch (InterruptedException e) {
+ //// e.printStackTrace();
+ //// }
+ // /* complete successfully */
+ // });
+ }
+ };
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
new file mode 100644
index 0000000..43cb4f1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+
+public class TestUtils {
+ private TestUtils() {}
+
+ public static <T> Matcher<FromFunction.PersistedValueMutation> modifiedValue(
+ ValueSpec<T> spec, T newValue) {
+ FromFunction.PersistedValueMutation mutation =
+ FromFunction.PersistedValueMutation.newBuilder()
+ .setStateName(spec.name())
+ .setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY)
+ .setStateValue(typedValue(spec.type(), newValue))
+ .build();
+
+ return Matchers.is(mutation);
+ }
+
+ public static FromFunction.PersistedValueSpec protoFromValueSpec(ValueSpec<?> spec) {
+ return ProtoUtils.protoFromValueSpec(spec).build();
+ }
+
+ public static <T> TypedValue.Builder typedValue(Type<T> type, T value) {
+ Slice serializedValue = type.typeSerializer().serialize(value);
+ return TypedValue.newBuilder()
+ .setTypename(type.typeName().asTypeNameString())
+ .setHasValue(value != null)
+ .setValue(SliceProtobufUtil.asByteString(serializedValue));
+ }
+
+ /** A test utility to build ToFunction messages using SDK concepts. */
+ public static final class RequestBuilder {
+
+ private final ToFunction.InvocationBatchRequest.Builder builder =
+ ToFunction.InvocationBatchRequest.newBuilder();
+
+ public RequestBuilder withTarget(TypeName target, String id) {
+ builder.setTarget(
+ Address.newBuilder().setNamespace(target.namespace()).setType(target.name()).setId(id));
+ return this;
+ }
+
+ public <T> RequestBuilder withState(ValueSpec<T> spec) {
+ builder.addState(ToFunction.PersistedValue.newBuilder().setStateName(spec.name()));
+
+ return this;
+ }
+
+ public <T> RequestBuilder withState(ValueSpec<T> spec, T value) {
+ builder.addState(
+ ToFunction.PersistedValue.newBuilder()
+ .setStateName(spec.name())
+ .setStateValue(typedValue(spec.type(), value)));
+
+ return this;
+ }
+
+ public <T> RequestBuilder withInvocation(Type<T> type, T value) {
+ builder.addInvocations(
+ ToFunction.Invocation.newBuilder().setArgument(typedValue(type, value)));
+ return this;
+ }
+
+ public ToFunction build() {
+ return ToFunction.newBuilder().setInvocation(builder).build();
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
new file mode 100644
index 0000000..f0ba3c4
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+public class SliceOutputTest {
+
+ private static final byte[] TEST_BYTES = new byte[] {1, 2, 3, 4, 5};
+
+ @Test
+ public void usageExample() {
+ SliceOutput output = SliceOutput.sliceOutput(32);
+
+ output.write(TEST_BYTES);
+ Slice slice = output.copyOf();
+
+ assertArrayEquals(TEST_BYTES, slice.toByteArray());
+ }
+
+ @Test
+ public void sliceShouldAutoGrow() {
+ SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+
+ sliceOutput.write(TEST_BYTES);
+ Slice slice = sliceOutput.copyOf();
+ byte[] got = new byte[slice.readableBytes()];
+ slice.copyTo(got);
+
+ assertArrayEquals(got, TEST_BYTES);
+ }
+
+ @Test
+ public void writeStartingAtOffset() {
+ SliceOutput output = SliceOutput.sliceOutput(32);
+
+ output.write(TEST_BYTES);
+ Slice slice = output.copyOf();
+
+ byte[] slightlyBiggerBuf = new byte[1 + slice.readableBytes()];
+ slice.copyTo(slightlyBiggerBuf, 1);
+
+ byte[] got = deleteFirstByte(slightlyBiggerBuf);
+ assertArrayEquals(TEST_BYTES, got);
+ }
+
+ @Test
+ public void randomizedTest() {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < 1_000_000; i++) {
+ // create a random buffer of a random size.
+ final int size = random.nextInt(0, 32);
+ byte[] buf = randomBuffer(random, size);
+ // write the buffer in random chunks
+ SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+ for (OffsetLenPair chunk : randomChunks(random, size)) {
+ sliceOutput.write(buf, chunk.offset, chunk.len);
+ }
+ // verify
+ Slice slice = sliceOutput.copyOf();
+ assertArrayEquals(buf, slice.toByteArray());
+ }
+ }
+
+ @Test
+ public void copyFromInputStreamRandomizedTest() {
+ byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+ ByteArrayInputStream input = new ByteArrayInputStream(expected);
+
+ Slice slice = Slices.copyOf(input, 0);
+
+ byte[] actual = slice.toByteArray();
+
+ assertArrayEquals(expected, actual);
+ }
+
+ @Test
+ public void toOutputStreamTest() {
+ byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+
+ Slice slice = Slices.wrap(expected);
+
+ ByteArrayOutputStream output = new ByteArrayOutputStream(expected.length);
+ slice.copyTo(output);
+
+ assertArrayEquals(expected, output.toByteArray());
+ }
+
+ private static byte[] deleteFirstByte(byte[] slightlyBiggerBuf) {
+ return Arrays.copyOfRange(slightlyBiggerBuf, 1, slightlyBiggerBuf.length);
+ }
+
+ private static byte[] randomBuffer(ThreadLocalRandom random, int size) {
+ byte[] buf = new byte[size];
+ random.nextBytes(buf);
+ return buf;
+ }
+
+ /** Compute a random partition of @size to pairs of (offset, len). */
+ private static List<OffsetLenPair> randomChunks(ThreadLocalRandom random, int size) {
+ ArrayList<OffsetLenPair> chunks = new ArrayList<>();
+ int offset = 0;
+ while (offset != size) {
+ int remaining = size - offset;
+ int toWrite = random.nextInt(remaining + 1);
+ chunks.add(new OffsetLenPair(offset, toWrite));
+ offset += toWrite;
+ }
+ return chunks;
+ }
+
+ private static final class OffsetLenPair {
+ final int offset;
+ final int len;
+
+ public OffsetLenPair(int offset, int len) {
+ this.offset = offset;
+ this.len = len;
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
new file mode 100644
index 0000000..85f21c2
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertSame;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+public class SliceProtobufUtilTest {
+
+ @Test
+ public void usageExample() {
+ ByteString expected = ByteString.copyFromUtf8("Hello world");
+
+ Slice slice = SliceProtobufUtil.asSlice(expected);
+ ByteString got = SliceProtobufUtil.asByteString(slice);
+
+ assertSame("Expecting the same reference.", expected, got);
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
new file mode 100644
index 0000000..0afaa02
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.junit.Test;
+
+public class ConcurrentAddressScopedStorageTest {
+
+ @Test
+ public void exampleUsage() {
+ final ValueSpec<Integer> stateSpec1 = ValueSpec.named("state-1").withIntType();
+ final ValueSpec<Boolean> stateSpec2 = ValueSpec.named("state-2").withBooleanType();
+
+ final List<StateValueContext<?>> testStateValues =
+ testStateValues(stateValue(stateSpec1, 91), stateValue(stateSpec2, true));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ assertThat(storage.get(stateSpec1), is(Optional.of(91)));
+ assertThat(storage.get(stateSpec2), is(Optional.of(true)));
+ }
+
+ @Test
+ public void getNullValueCell() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, null));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ assertThat(storage.get(stateSpec), is(Optional.empty()));
+ }
+
+ @Test
+ public void setCell() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.set(stateSpec, 1111);
+
+ assertThat(storage.get(stateSpec), is(Optional.of(1111)));
+ }
+
+ @Test
+ public void setMutableTypeCell() {
+ final ValueSpec<TestMutableType.Type> stateSpec =
+ ValueSpec.named("state").withCustomType(new TestMutableType());
+
+ final List<StateValueContext<?>> testStateValues =
+ testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+ final TestMutableType.Type newValue = new TestMutableType.Type("hello again!");
+ storage.set(stateSpec, newValue);
+
+ // mutations after a set should not have any effect
+ newValue.mutate("this value should not be written to storage!");
+ assertThat(storage.get(stateSpec), is(Optional.of(new TestMutableType.Type("hello again!"))));
+ }
+
+ @Test
+ public void clearCell() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.remove(stateSpec);
+
+ assertThat(storage.get(stateSpec), is(Optional.empty()));
+ }
+
+ @Test
+ public void clearMutableTypeCell() {
+ final ValueSpec<TestMutableType.Type> stateSpec =
+ ValueSpec.named("state").withCustomType(new TestMutableType());
+
+ List<StateValueContext<?>> testStateValues =
+ testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.remove(stateSpec);
+
+ assertThat(storage.get(stateSpec), is(Optional.empty()));
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void getNonExistingCell() {
+ final AddressScopedStorage storage =
+ new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+ storage.get(ValueSpec.named("doesn't-exist").withIntType());
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void setNonExistingCell() {
+ final AddressScopedStorage storage =
+ new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+ storage.set(ValueSpec.named("doesn't-exist").withIntType(), 999);
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void clearNonExistingCell() {
+ final AddressScopedStorage storage =
+ new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+ storage.remove(ValueSpec.named("doesn't-exist").withIntType());
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void setToNull() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.set(stateSpec, null);
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void getWithWrongType() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.get(ValueSpec.named("state").withBooleanType());
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void setWithWrongType() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.set(ValueSpec.named("state").withBooleanType(), true);
+ }
+
+ @Test(expected = IllegalStorageAccessException.class)
+ public void clearWithWrongType() {
+ final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+ final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+ final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+ storage.remove(ValueSpec.named("state").withBooleanType());
+ }
+
+ private static List<StateValueContext<?>> testStateValues(StateValueContext<?>... testValues) {
+ return Arrays.asList(testValues);
+ }
+
+ private static <T> StateValueContext<T> stateValue(ValueSpec<T> spec, T value) {
+ final ToFunction.PersistedValue protocolValue =
+ ToFunction.PersistedValue.newBuilder()
+ .setStateName(spec.name())
+ .setStateValue(
+ TypedValue.newBuilder()
+ .setTypename(spec.type().typeName().asTypeNameString())
+ .setHasValue(value != null)
+ .setValue(toByteString(spec.type(), value)))
+ .build();
+
+ return new StateValueContext<>(spec, protocolValue);
+ }
+
+ private static <T> ByteString toByteString(Type<T> type, T value) {
+ if (value == null) {
+ return ByteString.EMPTY;
+ }
+ return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
new file mode 100644
index 0000000..a9c3902
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+public final class StateValueContextsTest {
+
+ @Test
+ public void exampleUsage() {
+ final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(2);
+ registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+ registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+
+ final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(2);
+ providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+ providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+ final List<StateValueContext<?>> resolvedStateValues =
+ StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+ assertThat(resolvedStateValues.size(), is(2));
+ assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-1")));
+ assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-2")));
+ }
+
+ @Test
+ public void missingProtocolValues() {
+ final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(3);
+ registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+ registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+ registeredSpecs.put("state-3", ValueSpec.named("state-3").withUtf8String());
+
+ // only value for state-2 was provided
+ final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(1);
+ providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+ final List<ValueSpec<?>> statesWithMissingValue =
+ StateValueContexts.resolve(registeredSpecs, providedProtocolValues).missingValues();
+
+ assertThat(statesWithMissingValue.size(), is(2));
+ assertThat(statesWithMissingValue, hasItem(valueSpec("state-1", Types.integerType())));
+ assertThat(statesWithMissingValue, hasItem(valueSpec("state-3", Types.stringType())));
+ }
+
+ @Test
+ public void extraProtocolValues() {
+ final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(1);
+ registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+
+ // a few extra states were provided, and should be ignored
+ final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(3);
+ providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+ providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+ providedProtocolValues.add(protocolValue("state-3", Types.stringType(), "ignore me!"));
+
+ final List<StateValueContext<?>> resolvedStateValues =
+ StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+ assertThat(resolvedStateValues.size(), is(1));
+ ValueSpec<?> spec = resolvedStateValues.get(0).spec();
+ assertThat(spec.name(), Matchers.is("state-1"));
+ }
+
+ private static <T> ToFunction.PersistedValue protocolValue(
+ String stateName, Type<T> type, T value) {
+ return ToFunction.PersistedValue.newBuilder()
+ .setStateName(stateName)
+ .setStateValue(
+ TypedValue.newBuilder()
+ .setTypename(type.typeName().asTypeNameString())
+ .setHasValue(value != null)
+ .setValue(toByteString(type, value)))
+ .build();
+ }
+
+ private static <T> ByteString toByteString(Type<T> type, T value) {
+ if (value == null) {
+ return ByteString.EMPTY;
+ }
+ return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+ }
+
+ private static <T> Matcher<ValueSpec<T>> valueSpec(String stateName, Type<T> type) {
+ return new TypeSafeMatcher<ValueSpec<T>>() {
+ @Override
+ protected boolean matchesSafely(ValueSpec<T> testSpec) {
+ return testSpec.type().getClass() == type.getClass() && testSpec.name().equals(stateName);
+ }
+
+ @Override
+ public void describeTo(Description description) {}
+ };
+ }
+
+ private static <T> Matcher<StateValueContext<T>> stateValueContextNamed(String name) {
+ return new TypeSafeDiagnosingMatcher<StateValueContext<T>>() {
+ @Override
+ protected boolean matchesSafely(StateValueContext<T> ctx, Description description) {
+ if (!Objects.equals(ctx.spec().name(), name)) {
+ description.appendText(ctx.spec().name());
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("A StateValueContext named ").appendText(name);
+ }
+ };
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
new file mode 100644
index 0000000..6c31b00
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+
+public class TestMutableType implements Type<TestMutableType.Type> {
+
+ @Override
+ public TypeName typeName() {
+ return TypeName.typeNameOf("test", "my-mutable-type");
+ }
+
+ @Override
+ public TypeSerializer<TestMutableType.Type> typeSerializer() {
+ return new Serializer();
+ }
+
+ public static class Type {
+ private String value;
+
+ public Type(String value) {
+ this.value = value;
+ }
+
+ public void mutate(String newValue) {
+ this.value = newValue;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ Type type = (Type) o;
+ return Objects.equals(value, type.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(value);
+ }
+ }
+
+ private static class Serializer implements TypeSerializer<TestMutableType.Type> {
+ @Override
+ public Slice serialize(Type value) {
+ return Slices.wrap(value.value.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @Override
+ public Type deserialize(Slice bytes) {
+ return new Type(new String(bytes.toByteArray(), StandardCharsets.UTF_8));
+ }
+ }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
new file mode 100644
index 0000000..90f4ca1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class SanityPrimitiveTypeTest {
+
+ @Test
+ public void testBoolean() {
+ assertRoundTrip(Types.booleanType(), Boolean.TRUE);
+ assertRoundTrip(Types.booleanType(), Boolean.FALSE);
+ }
+
+ @Test
+ public void testInt() {
+ assertRoundTrip(Types.integerType(), 1);
+ assertRoundTrip(Types.integerType(), 1048576);
+ assertRoundTrip(Types.integerType(), Integer.MIN_VALUE);
+ assertRoundTrip(Types.integerType(), Integer.MAX_VALUE);
+ assertRoundTrip(Types.integerType(), -1);
+ }
+
+ @Test
+ public void testLong() {
+ assertRoundTrip(Types.longType(), -1L);
+ assertRoundTrip(Types.longType(), 0L);
+ assertRoundTrip(Types.longType(), Long.MIN_VALUE);
+ assertRoundTrip(Types.longType(), Long.MAX_VALUE);
+ }
+
+ @Test
+ public void testFloat() {
+ assertRoundTrip(Types.floatType(), Float.MIN_VALUE);
+ assertRoundTrip(Types.floatType(), Float.MAX_VALUE);
+ assertRoundTrip(Types.floatType(), 2.1459f);
+ assertRoundTrip(Types.floatType(), -1e-4f);
+ }
+
+ @Test
+ public void testDouble() {
+ assertRoundTrip(Types.doubleType(), Double.MIN_VALUE);
+ assertRoundTrip(Types.doubleType(), Double.MAX_VALUE);
+ assertRoundTrip(Types.doubleType(), 2.1459d);
+ assertRoundTrip(Types.doubleType(), -1e-4d);
+ }
+
+ @Test
+ public void testString() {
+ assertRoundTrip(Types.stringType(), "");
+ assertRoundTrip(Types.stringType(), "This is a string");
+ }
+
+ @Test
+ public void testRandomCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+ TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+ for (int i = 0; i < 1_000_000; i++) {
+ testCompatibilityWithWrapper(
+ serializer, IntWrapper::parseFrom, IntWrapper::getValue, IntWrapper::toByteArray, i);
+ }
+ }
+
+ @Test
+ public void testCompatibilityWithABooleanWrapper() throws InvalidProtocolBufferException {
+ TypeSerializer<Boolean> serializer = Types.booleanType().typeSerializer();
+ testCompatibilityWithWrapper(
+ serializer,
+ BooleanWrapper::parseFrom,
+ BooleanWrapper::getValue,
+ BooleanWrapper::toByteArray,
+ true);
+
+ testCompatibilityWithWrapper(
+ serializer,
+ BooleanWrapper::parseFrom,
+ BooleanWrapper::getValue,
+ BooleanWrapper::toByteArray,
+ false);
+ }
+
+ @Test
+ public void testRandomCompatibilityWithALongWrapper() throws InvalidProtocolBufferException {
+ TypeSerializer<Long> serializer = Types.longType().typeSerializer();
+ for (long i = 0; i < 1_000_000; i++) {
+ testCompatibilityWithWrapper(
+ serializer, LongWrapper::parseFrom, LongWrapper::getValue, LongWrapper::toByteArray, i);
+ }
+ }
+
+ @Ignore
+ @Test
+ public void testCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+ TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+ for (int expected = Integer.MIN_VALUE; expected != Integer.MAX_VALUE; expected++) {
+ testCompatibilityWithWrapper(
+ serializer,
+ IntWrapper::parseFrom,
+ IntWrapper::getValue,
+ IntWrapper::toByteArray,
+ expected);
+ }
+ }
+
+ @Test
+ public void testRandomCompatibilityWithStringWrapper() throws InvalidProtocolBufferException {
+ TypeSerializer<String> serializer = Types.stringType().typeSerializer();
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ for (int i = 0; i < 1_000; i++) {
+ int n = random.nextInt(4096);
+ byte[] buf = new byte[n];
+ random.nextBytes(buf);
+ String expected = new String(buf, StandardCharsets.UTF_8);
+
+ testCompatibilityWithWrapper(
+ serializer,
+ StringWrapper::parseFrom,
+ StringWrapper::getValue,
+ StringWrapper::toByteArray,
+ expected);
+ }
+ }
+
+ @FunctionalInterface
+ interface Fn<I, O> {
+ O apply(I input) throws InvalidProtocolBufferException;
+ }
+
+ private static <T, W> void testCompatibilityWithWrapper(
+ TypeSerializer<T> serializer,
+ Fn<ByteBuffer, W> parseFrom,
+ Fn<W, T> getValue,
+ Fn<W, byte[]> toByteArray,
+ T expected)
+ throws InvalidProtocolBufferException {
+ //
+ // test round trip with ourself.
+ //
+ final Slice serialized = serializer.serialize(expected);
+ final T got = serializer.deserialize(serialized);
+ assertEquals(expected, got);
+ //
+ // test that protobuf can parse what we wrote:
+ //
+ final W wrapper = parseFrom.apply(serialized.asReadOnlyByteBuffer());
+ assertEquals(expected, getValue.apply(wrapper));
+ //
+ // test that we can parse what protobuf wrote:
+ //
+ final Slice serializedByPb = Slices.wrap(toByteArray.apply(wrapper));
+ final T gotPb = serializer.deserialize(serializedByPb);
+ assertEquals(gotPb, expected);
+ // test that pb byte representation is equal to ours:
+ assertEquals(serializedByPb.asReadOnlyByteBuffer(), serialized.asReadOnlyByteBuffer());
+ }
+
+ public <T> void assertRoundTrip(Type<T> type, T element) {
+ final Slice slice;
+ {
+ TypeSerializer<T> serializer = type.typeSerializer();
+ slice = serializer.serialize(element);
+ }
+ TypeSerializer<T> serializer = type.typeSerializer();
+ T got = serializer.deserialize(slice);
+ assertEquals(element, got);
+ }
+}