You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2021/02/24 11:30:22 UTC

[flink-statefun] branch master updated (ec69df6 -> 20b521e)

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from ec69df6  [hotfix] Temporary disable E2E tests in CI
     add 96cd004  [FLINK-21457] Add support to differentiate a zero length value bytes and non existing value
     new 20b521e  [FLINK-21459] Implement remote Java SDK for Stateful Functions

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/common/types/TypedValueUtil.java         |   1 +
 .../protorouter/AutoRoutableProtobufRouter.java    |   6 +-
 .../reqreply/PersistedRemoteFunctionValues.java    |   3 +-
 .../PersistedRemoteFunctionValuesTest.java         |   6 +-
 .../core/reqreply/RequestReplyFunctionTest.java    |   2 +
 statefun-sdk-java/pom.xml                          |  19 +-
 .../java/com/google/protobuf/MoreByteStrings.java  |  23 +-
 .../apache/flink/statefun/sdk/java}/Address.java   |  16 +-
 .../statefun/sdk/java/AddressScopedStorage.java    |  16 +-
 .../flink/statefun/sdk/java/ApiExtension.java      |  20 +-
 .../apache/flink/statefun/sdk/java/Context.java    |  31 +-
 .../flink/statefun/sdk/java}/Expiration.java       |   8 +-
 .../flink/statefun/sdk/java/StatefulFunction.java  |  26 +-
 .../statefun/sdk/java/StatefulFunctionSpec.java    |  78 ++++
 .../flink/statefun/sdk/java/StatefulFunctions.java |  36 +-
 .../apache/flink/statefun/sdk/java/TypeName.java   |  59 ++-
 .../apache/flink/statefun/sdk/java/ValueSpec.java  | 115 ++++++
 .../statefun/sdk/java/annotations/Internal.java    |   8 +-
 .../sdk/java/handler/ConcurrentContext.java        | 147 +++++++
 .../handler/ConcurrentRequestReplyHandler.java     | 150 +++++++
 .../statefun/sdk/java/handler/MoreFutures.java     |  66 +++
 .../statefun/sdk/java/handler/ProtoUtils.java      |  97 +++++
 .../sdk/java/handler/RequestReplyHandler.java      |  16 +-
 .../statefun/sdk/java/io/KafkaEgressMessage.java   | 127 ++++++
 .../statefun/sdk/java/io/KinesisEgressMessage.java | 146 +++++++
 .../statefun/sdk/java/message/EgressMessage.java   |  26 +-
 .../sdk/java/message/EgressMessageWrapper.java     |  55 +++
 .../flink/statefun/sdk/java/message/Message.java   |  43 +-
 .../statefun/sdk/java/message/MessageBuilder.java  | 113 +++++
 .../statefun/sdk/java/message/MessageWrapper.java  | 133 ++++++
 .../statefun/sdk/java/slice/ByteStringSlice.java   |  80 ++++
 .../flink/statefun/sdk/java/slice/Slice.java       |  25 +-
 .../flink/statefun/sdk/java/slice/SliceOutput.java | 108 +++++
 .../statefun/sdk/java/slice/SliceProtobufUtil.java |  55 +++
 .../flink/statefun/sdk/java/slice/Slices.java      |  61 +++
 .../storage/ConcurrentAddressScopedStorage.java    | 347 ++++++++++++++++
 .../storage/IllegalStorageAccessException.java     |  10 +-
 .../sdk/java/storage/StateValueContexts.java       | 131 ++++++
 .../flink/statefun/sdk/java/types/SimpleType.java  | 104 +++++
 .../apache/flink/statefun/sdk/java/types/Type.java |  17 +-
 .../sdk/java/types/TypeCharacteristics.java        |   6 +-
 .../statefun/sdk/java/types/TypeSerializer.java    |   9 +-
 .../flink/statefun/sdk/java/types/Types.java       | 456 +++++++++++++++++++++
 .../handler/ConcurrentRequestReplyHandlerTest.java | 116 ++++++
 .../statefun/sdk/java/handler/MoreFuturesTest.java |  93 +++++
 .../flink/statefun/sdk/java/handler/TestUtils.java |  96 +++++
 .../statefun/sdk/java/slice/SliceOutputTest.java   | 144 +++++++
 .../sdk/java/slice/SliceProtobufUtilTest.java      |  20 +-
 .../ConcurrentAddressScopedStorageTest.java        | 206 ++++++++++
 .../sdk/java/storage/StateValueContextsTest.java   | 150 +++++++
 .../statefun/sdk/java/storage/TestMutableType.java |  77 ++++
 .../sdk/java/types/SanityPrimitiveTypeTest.java    | 194 +++++++++
 .../src/main/protobuf/sdk/request-reply.proto      |   5 +-
 53 files changed, 3919 insertions(+), 183 deletions(-)
 copy statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-simulator/src/main/java/org/apache/flink/statefun/examples/ridesharing/simulator/simulation/engine/LifecycleMessages.java => statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java (63%)
 copy {statefun-sdk/src/main/java/org/apache/flink/statefun/sdk => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java}/Address.java (81%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionSpec.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java (77%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/common/ManagingResources.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java (67%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/reqreply/RequestReplyClient.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java (60%)
 copy {statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/state => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java}/Expiration.java (91%)
 copy statefun-examples/statefun-python-walkthrough-example/walkthrough.proto => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java (74%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/grpcfn/GrpcFunctionProvider.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java (50%)
 copy statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/FunctionType.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java (53%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
 copy statefun-sdk/src/main/java/org/apache/flink/statefun/sdk/annotations/ForRuntime.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java (82%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
 copy statefun-examples/statefun-async-example/src/main/java/org/apache/flink/statefun/examples/async/service/TaskQueryService.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java (64%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
 copy statefun-examples/statefun-python-walkthrough-example/walkthrough.proto => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java (73%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionTypeMetrics.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java (54%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/logger/FeedbackLogger.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java (68%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/ModuleConfigurationException.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java (75%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/FunctionSpec.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java (70%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/jsonmodule/ModuleType.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java (88%)
 copy statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/metrics/FunctionDispatcherMetrics.java => statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java (80%)
 create mode 100644 statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
 copy statefun-e2e-tests/statefun-smoke-e2e/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeVerificationE2E.java => statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java (64%)
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
 create mode 100644 statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java


[flink-statefun] 01/01: [FLINK-21459] Implement remote Java SDK for Stateful Functions

Posted by tz...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 20b521e5c131c914617008033bd4c63fba81fe97
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue Feb 23 18:09:36 2021 +0800

    [FLINK-21459] Implement remote Java SDK for Stateful Functions
    
    Add the initial implementation of the Java SDK on top of the new remote invocation protocol.
    
    Co-authored-by: Tzu-Li (Gordon) Tai <tz...@apache.org>
    
    This closes #201.
---
 statefun-sdk-java/pom.xml                          |  19 +-
 .../java/com/google/protobuf/MoreByteStrings.java  |  39 ++
 .../apache/flink/statefun/sdk/java/Address.java    |  85 ++++
 .../statefun/sdk/java/AddressScopedStorage.java    |  28 ++
 .../flink/statefun/sdk/java/ApiExtension.java      |  34 ++
 .../apache/flink/statefun/sdk/java/Context.java    |  43 ++
 .../apache/flink/statefun/sdk/java/Expiration.java |  92 +++++
 .../flink/statefun/sdk/java/StatefulFunction.java  |  26 ++
 .../statefun/sdk/java/StatefulFunctionSpec.java    |  78 ++++
 .../flink/statefun/sdk/java/StatefulFunctions.java |  46 +++
 .../apache/flink/statefun/sdk/java/TypeName.java   | 137 +++++++
 .../apache/flink/statefun/sdk/java/ValueSpec.java  | 115 ++++++
 .../statefun/sdk/java/annotations/Internal.java    |  30 ++
 .../sdk/java/handler/ConcurrentContext.java        | 147 +++++++
 .../handler/ConcurrentRequestReplyHandler.java     | 150 +++++++
 .../statefun/sdk/java/handler/MoreFutures.java     |  66 +++
 .../statefun/sdk/java/handler/ProtoUtils.java      |  97 +++++
 .../sdk/java/handler/RequestReplyHandler.java      |  34 ++
 .../statefun/sdk/java/io/KafkaEgressMessage.java   | 127 ++++++
 .../statefun/sdk/java/io/KinesisEgressMessage.java | 146 +++++++
 .../statefun/sdk/java/message/EgressMessage.java   |  30 ++
 .../sdk/java/message/EgressMessageWrapper.java     |  55 +++
 .../flink/statefun/sdk/java/message/Message.java   |  60 +++
 .../statefun/sdk/java/message/MessageBuilder.java  | 113 +++++
 .../statefun/sdk/java/message/MessageWrapper.java  | 133 ++++++
 .../statefun/sdk/java/slice/ByteStringSlice.java   |  80 ++++
 .../flink/statefun/sdk/java/slice/Slice.java       |  40 ++
 .../flink/statefun/sdk/java/slice/SliceOutput.java | 108 +++++
 .../statefun/sdk/java/slice/SliceProtobufUtil.java |  55 +++
 .../flink/statefun/sdk/java/slice/Slices.java      |  61 +++
 .../storage/ConcurrentAddressScopedStorage.java    | 347 ++++++++++++++++
 .../storage/IllegalStorageAccessException.java     |  28 ++
 .../sdk/java/storage/StateValueContexts.java       | 131 ++++++
 .../flink/statefun/sdk/java/types/SimpleType.java  | 104 +++++
 .../apache/flink/statefun/sdk/java/types/Type.java |  33 ++
 .../sdk/java/types/TypeCharacteristics.java        |  22 +
 .../statefun/sdk/java/types/TypeSerializer.java    |  27 ++
 .../flink/statefun/sdk/java/types/Types.java       | 456 +++++++++++++++++++++
 .../handler/ConcurrentRequestReplyHandlerTest.java | 116 ++++++
 .../statefun/sdk/java/handler/MoreFuturesTest.java |  93 +++++
 .../flink/statefun/sdk/java/handler/TestUtils.java |  96 +++++
 .../statefun/sdk/java/slice/SliceOutputTest.java   | 144 +++++++
 .../sdk/java/slice/SliceProtobufUtilTest.java      |  36 ++
 .../ConcurrentAddressScopedStorageTest.java        | 206 ++++++++++
 .../sdk/java/storage/StateValueContextsTest.java   | 150 +++++++
 .../statefun/sdk/java/storage/TestMutableType.java |  77 ++++
 .../sdk/java/types/SanityPrimitiveTypeTest.java    | 194 +++++++++
 47 files changed, 4529 insertions(+), 5 deletions(-)

diff --git a/statefun-sdk-java/pom.xml b/statefun-sdk-java/pom.xml
index 62bb8de..f58829a 100644
--- a/statefun-sdk-java/pom.xml
+++ b/statefun-sdk-java/pom.xml
@@ -24,13 +24,10 @@ under the License.
         <version>2.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
     <artifactId>statefun-sdk-java</artifactId>
-
     <properties>
         <additional-sources.dir>target/additional-sources</additional-sources.dir>
     </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -42,8 +39,20 @@ under the License.
             <artifactId>protobuf-java</artifactId>
             <version>${protobuf.version}</version>
         </dependency>
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
-
     <build>
         <plugins>
             <!--
@@ -145,4 +154,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
new file mode 100644
index 0000000..9c4269e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.protobuf;
+
+import java.nio.ByteBuffer;
+
+public class MoreByteStrings {
+
+  public static ByteString wrap(byte[] bytes) {
+    return ByteString.wrap(bytes);
+  }
+
+  public static ByteString wrap(byte[] bytes, int offset, int len) {
+    return ByteString.wrap(bytes, offset, len);
+  }
+
+  public static ByteString wrap(ByteBuffer buffer) {
+    return ByteString.wrap(buffer);
+  }
+
+  public static ByteString concat(ByteString first, ByteString second) {
+    return first.concat(second);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
new file mode 100644
index 0000000..86dbe76
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Objects;
+
+/**
+ * An {@link Address} is the unique identity of an individual {@link StatefulFunction}, containing
+ * of the function's {@link TypeName} and an unique identifier within the type. The function's type
+ * denotes the class of function to invoke, while the unique identifier addresses the invocation to
+ * a specific function instance.
+ */
+public final class Address {
+  private final TypeName type;
+  private final String id;
+
+  /**
+   * Creates an {@link Address}.
+   *
+   * @param type type of the function.
+   * @param id unique id within the function type.
+   */
+  public Address(TypeName type, String id) {
+    this.type = Objects.requireNonNull(type);
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * Returns the {@link TypeName} that this address identifies.
+   *
+   * @return type of the function
+   */
+  public TypeName type() {
+    return type;
+  }
+
+  /**
+   * Returns the unique function id, within its type, that this address identifies.
+   *
+   * @return unique id within the function type.
+   */
+  public String id() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Address address = (Address) o;
+    return type.equals(address.type) && id.equals(address.id);
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    hash = 37 * hash + type.hashCode();
+    hash = 37 * hash + id.hashCode();
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Address(%s, %s, %s)", type.namespace(), type.name(), id);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
new file mode 100644
index 0000000..d04310a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Optional;
+
+public interface AddressScopedStorage {
+  <T> Optional<T> get(ValueSpec<T> descriptor);
+
+  <T> void set(ValueSpec<T> key, T value);
+
+  <T> void remove(ValueSpec<T> key);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
new file mode 100644
index 0000000..2d4ebe4
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class ApiExtension {
+
+  public static ByteString typeNameByteString(TypeName typeName) {
+    return typeName.typeNameByteString();
+  }
+
+  public static ByteString stateNameByteString(ValueSpec<?> spec) {
+    return spec.nameByteString();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
new file mode 100644
index 0000000..9bc011b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface Context {
+
+  Address self();
+
+  Optional<Address> caller();
+
+  void send(Message message);
+
+  void sendAfter(Duration duration, Message message);
+
+  void send(EgressMessage message);
+
+  AddressScopedStorage storage();
+
+  default CompletableFuture<Void> done() {
+    return CompletableFuture.completedFuture(null);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
new file mode 100644
index 0000000..060c7ab
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * State Expiration Configuration
+ *
+ * <p>This class defines the way state can be auto expired by the runtime. State expiration (also
+ * known as state TTL) can be used to keep state from growing arbitrarily by assigning an expiration
+ * date to a value.
+ *
+ * <p>State can be expired after a duration had passed since either from the last write to the
+ * state, or the last read.
+ */
+public final class Expiration implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  public enum Mode {
+    NONE,
+    AFTER_WRITE,
+    AFTER_READ_OR_WRITE;
+  }
+
+  /**
+   * Returns an Expiration configuration that would expire a @duration after the last write.
+   *
+   * @param duration a duration to wait before considering the state expired.
+   */
+  public static Expiration expireAfterWriting(Duration duration) {
+    return new Expiration(Mode.AFTER_WRITE, duration);
+  }
+
+  /**
+   * Returns an Expiration configuration that would expire a @duration after the last write or read.
+   *
+   * @param duration a duration to wait before considering the state expired.
+   */
+  public static Expiration expireAfterReadingOrWriting(Duration duration) {
+    return new Expiration(Mode.AFTER_READ_OR_WRITE, duration);
+  }
+
+  public static Expiration expireAfter(Duration duration, Mode mode) {
+    return new Expiration(mode, duration);
+  }
+
+  /** @return Returns a disabled expiration */
+  public static Expiration none() {
+    return new Expiration(Mode.NONE, Duration.ZERO);
+  }
+
+  private final Mode mode;
+  private final Duration duration;
+
+  private Expiration(Mode mode, Duration duration) {
+    this.mode = Objects.requireNonNull(mode);
+    this.duration = Objects.requireNonNull(duration);
+  }
+
+  public Mode mode() {
+    return mode;
+  }
+
+  public Duration duration() {
+    return duration;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Expiration{mode=%s, duration=%s}", mode, duration);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
new file mode 100644
index 0000000..bd9aef6
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface StatefulFunction {
+
+  CompletableFuture<Void> apply(Context context, Message argument) throws Throwable;
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
new file mode 100644
index 0000000..b7dc7bc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+public final class StatefulFunctionSpec {
+  private final TypeName typeName;
+  private final Map<String, ValueSpec<?>> knownValues;
+  private final Supplier<? extends StatefulFunction> supplier;
+
+  public static Builder builder(TypeName typeName) {
+    return new Builder(typeName);
+  }
+
+  private StatefulFunctionSpec(
+      TypeName typeName,
+      Map<String, ValueSpec<?>> knownValues,
+      Supplier<? extends StatefulFunction> supplier) {
+    this.typeName = Objects.requireNonNull(typeName);
+    this.supplier = Objects.requireNonNull(supplier);
+    this.knownValues = Objects.requireNonNull(knownValues);
+  }
+
+  public TypeName typeName() {
+    return typeName;
+  }
+
+  public Map<String, ValueSpec<?>> knownValues() {
+    return knownValues;
+  }
+
+  public Supplier<? extends StatefulFunction> supplier() {
+    return supplier;
+  }
+
+  public static final class Builder {
+    private final TypeName typeName;
+    private final Map<String, ValueSpec<?>> knownValues = new HashMap<>();
+    private Supplier<? extends StatefulFunction> supplier;
+
+    private Builder(TypeName typeName) {
+      this.typeName = Objects.requireNonNull(typeName);
+    }
+
+    public Builder withValueSpec(ValueSpec<?> valueSpec) {
+      knownValues.put(valueSpec.name(), valueSpec);
+      return this;
+    }
+
+    public Builder withSupplier(Supplier<? extends StatefulFunction> supplier) {
+      this.supplier = Objects.requireNonNull(supplier);
+      return this;
+    }
+
+    public StatefulFunctionSpec build() {
+      return new StatefulFunctionSpec(typeName, knownValues, supplier);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
new file mode 100644
index 0000000..9561827
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+public class StatefulFunctions {
+  private final Map<TypeName, StatefulFunctionSpec> specs = new HashMap<>();
+
+  public StatefulFunctions withStatefulFunction(StatefulFunctionSpec.Builder builder) {
+    StatefulFunctionSpec spec = builder.build();
+    specs.put(spec.typeName(), spec);
+    return this;
+  }
+
+  public StatefulFunctions withStatefulFunction(StatefulFunctionSpec spec) {
+    specs.put(spec.typeName(), spec);
+    return this;
+  }
+
+  public Map<TypeName, StatefulFunctionSpec> functionSpecs() {
+    return specs;
+  }
+
+  public RequestReplyHandler requestReplyHandler() {
+    return new ConcurrentRequestReplyHandler(specs);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
new file mode 100644
index 0000000..ff7fde5
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * This class represents the type of a {@code StatefulFunction}, consisting of a namespace of the
+ * function type as well as the type's name.
+ *
+ * <p>A function's type is part of a function's {@link Address} and serves as integral part of an
+ * individual function's identity.
+ *
+ * @see Address
+ */
+public final class TypeName implements Serializable {
+
+  private static final long serialVersionUID = 1;
+
+  private final String namespace;
+  private final String type;
+  private final String typenameString;
+  private final ByteString typenameByteString;
+
+  public static TypeName typeNameOf(String namespace, String name) {
+    Objects.requireNonNull(namespace);
+    Objects.requireNonNull(name);
+    if (namespace.endsWith("/")) {
+      namespace = namespace.substring(0, namespace.length() - 1);
+    }
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("namespace can not be empty.");
+    }
+    if (name.isEmpty()) {
+      throw new IllegalArgumentException("name can not be empty.");
+    }
+    return new TypeName(namespace, name);
+  }
+
+  public static TypeName typeNameFromString(String typeNameString) {
+    Objects.requireNonNull(typeNameString);
+    final int pos = typeNameString.lastIndexOf("/");
+    if (pos <= 0 || pos == typeNameString.length() - 1) {
+      throw new IllegalArgumentException(
+          typeNameString + " does not conform to the <namespace>/<name> format");
+    }
+    String namespace = typeNameString.substring(0, pos);
+    String name = typeNameString.substring(pos + 1);
+    return typeNameOf(namespace, name);
+  }
+
+  /**
+   * Creates a {@link TypeName}.
+   *
+   * @param namespace the function type's namepsace.
+   * @param type the function type's name.
+   */
+  private TypeName(String namespace, String type) {
+    this.namespace = Objects.requireNonNull(namespace);
+    this.type = Objects.requireNonNull(type);
+    String typenameString = canonicalTypeNameString(namespace, type);
+    this.typenameString = typenameString;
+    this.typenameByteString = ByteString.copyFromUtf8(typenameString);
+  }
+
+  /**
+   * Returns the namespace of the function type.
+   *
+   * @return the namespace of the function type.
+   */
+  public String namespace() {
+    return namespace;
+  }
+
+  /**
+   * Returns the name of the function type.
+   *
+   * @return the name of the function type.
+   */
+  public String name() {
+    return type;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TypeName functionType = (TypeName) o;
+    return namespace.equals(functionType.namespace) && type.equals(functionType.type);
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    hash = 37 * hash + namespace.hashCode();
+    hash = 37 * hash + type.hashCode();
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return "TypeName(" + namespace + ", " + type + ")";
+  }
+
+  public String asTypeNameString() {
+    return typenameString;
+  }
+
+  ByteString typeNameByteString() {
+    return typenameByteString;
+  }
+
+  private static String canonicalTypeNameString(String namespace, String type) {
+    return namespace + '/' + type;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
new file mode 100644
index 0000000..d62d2b7
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+
+public final class ValueSpec<T> {
+
+  public static Untyped named(String name) {
+    Objects.requireNonNull(name);
+    return new Untyped(name);
+  }
+
+  private final String name;
+  private final Expiration expiration;
+  private final Type<T> type;
+  private final ByteString nameByteString;
+
+  private ValueSpec(Untyped untyped, Type<T> type) {
+    Objects.requireNonNull(untyped);
+    Objects.requireNonNull(type);
+    this.name = untyped.stateName;
+    this.expiration = untyped.expiration;
+    this.type = Objects.requireNonNull(type);
+    this.nameByteString = ByteString.copyFromUtf8(untyped.stateName);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Expiration expiration() {
+    return expiration;
+  }
+
+  public TypeName typeName() {
+    return type.typeName();
+  }
+
+  public Type<T> type() {
+    return type;
+  }
+
+  ByteString nameByteString() {
+    return nameByteString;
+  }
+
+  public static final class Untyped {
+    private final String stateName;
+    private Expiration expiration = Expiration.none();
+
+    public Untyped(String name) {
+      this.stateName = Objects.requireNonNull(name);
+    }
+
+    public Untyped thatExpireAfterWrite(Duration duration) {
+      this.expiration = Expiration.expireAfterWriting(duration);
+      return this;
+    }
+
+    public Untyped thatExpiresAfterReadOrWrite(Duration duration) {
+      this.expiration = Expiration.expireAfterReadingOrWriting(duration);
+      return this;
+    }
+
+    public ValueSpec<Integer> withIntType() {
+      return withCustomType(Types.integerType());
+    }
+
+    public ValueSpec<Long> withLongType() {
+      return withCustomType(Types.longType());
+    }
+
+    public ValueSpec<Float> withFloatType() {
+      return withCustomType(Types.floatType());
+    }
+
+    public ValueSpec<Double> withDoubleType() {
+      return withCustomType(Types.doubleType());
+    }
+
+    public ValueSpec<String> withUtf8String() {
+      return withCustomType(Types.stringType());
+    }
+
+    public ValueSpec<Boolean> withBooleanType() {
+      return new ValueSpec<>(this, Types.booleanType());
+    }
+
+    public <T> ValueSpec<T> withCustomType(Type<T> type) {
+      Objects.requireNonNull(type);
+      return new ValueSpec<>(this, type);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
new file mode 100644
index 0000000..a544b71
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Methods, constructors or classes annotated with this annotation, are used for by the SDK
+ * internally.
+ */
+@Documented
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.TYPE})
+public @interface Internal {}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
new file mode 100644
index 0000000..49e9fcc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.getTypedValue;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.protoAddressFromSdk;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+
+/**
+ * A thread safe implementation of a {@linkplain Context}.
+ *
+ * <p>This context's life cycle is tied to a single batch request. It is constructed when a
+ * {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction} message arrives, and it
+ * carries enough context to compute an {@linkplain
+ * org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse}. Access to the
+ * send/sendAfter/sendEgress methods are synchronized with a @responseBuilder's lock, to prevent
+ * concurrent modification. When the last invocation of the batch completes successfully, a {@link
+ * #finalBuilder()} will be called. After that point no further operations are allowed.
+ */
+final class ConcurrentContext implements Context {
+  private final org.apache.flink.statefun.sdk.java.Address self;
+  private final FromFunction.InvocationResponse.Builder responseBuilder;
+  private final ConcurrentAddressScopedStorage storage;
+  private boolean noFurtherModificationsAllowed;
+
+  private Address caller;
+
+  public ConcurrentContext(
+      org.apache.flink.statefun.sdk.java.Address self,
+      FromFunction.InvocationResponse.Builder responseBuilder,
+      ConcurrentAddressScopedStorage storage) {
+    this.self = Objects.requireNonNull(self);
+    this.responseBuilder = Objects.requireNonNull(responseBuilder);
+    this.storage = Objects.requireNonNull(storage);
+  }
+
+  @Override
+  public org.apache.flink.statefun.sdk.java.Address self() {
+    return self;
+  }
+
+  void setCaller(Address address) {
+    this.caller = address;
+  }
+
+  FromFunction.InvocationResponse.Builder finalBuilder() {
+    synchronized (responseBuilder) {
+      noFurtherModificationsAllowed = true;
+      return responseBuilder;
+    }
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return Optional.ofNullable(caller);
+  }
+
+  @Override
+  public void send(Message message) {
+    Objects.requireNonNull(message);
+
+    FromFunction.Invocation outInvocation =
+        FromFunction.Invocation.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setTarget(protoAddressFromSdk(message.targetAddress()))
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addOutgoingMessages(outInvocation);
+    }
+  }
+
+  @Override
+  public void sendAfter(Duration duration, Message message) {
+    Objects.requireNonNull(duration);
+    Objects.requireNonNull(message);
+
+    FromFunction.DelayedInvocation outInvocation =
+        FromFunction.DelayedInvocation.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setTarget(protoAddressFromSdk(message.targetAddress()))
+            .setDelayInMs(duration.toMillis())
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addDelayedInvocations(outInvocation);
+    }
+  }
+
+  @Override
+  public void send(EgressMessage message) {
+    Objects.requireNonNull(message);
+
+    TypeName target = message.targetEgressId();
+
+    FromFunction.EgressMessage outInvocation =
+        FromFunction.EgressMessage.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setEgressNamespace(target.namespace())
+            .setEgressType(target.name())
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addOutgoingEgresses(outInvocation);
+    }
+  }
+
+  @Override
+  public AddressScopedStorage storage() {
+    return storage;
+  }
+
+  private void checkNotDone() {
+    if (noFurtherModificationsAllowed) {
+      throw new IllegalStateException("Function has already completed its execution.");
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
new file mode 100644
index 0000000..7397525
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.sdkAddressFromProto;
+
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.storage.StateValueContexts;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+/**
+ * A threadsafe {@linkplain RequestReplyHandler}. This handler lifecycle is bound to the entire
+ * program, and can be safely and concurrently used to handle {@linkplain ToFunction} requests.
+ */
+@Internal
+public final class ConcurrentRequestReplyHandler implements RequestReplyHandler {
+  private final Map<TypeName, StatefulFunctionSpec> functionSpecs;
+
+  public ConcurrentRequestReplyHandler(Map<TypeName, StatefulFunctionSpec> functionSpecs) {
+    this.functionSpecs = Objects.requireNonNull(functionSpecs);
+  }
+
+  @Override
+  public CompletableFuture<Slice> handle(Slice requestBytes) {
+    try {
+      ByteString in = SliceProtobufUtil.asByteString(requestBytes);
+      ToFunction request = ToFunction.parseFrom(in);
+      CompletableFuture<FromFunction> response = handleInternally(request);
+      return response.thenApply(
+          res -> {
+            ByteString out = res.toByteString();
+            return SliceProtobufUtil.asSlice(out);
+          });
+    } catch (Throwable throwable) {
+      return MoreFutures.exceptional(throwable);
+    }
+  }
+
+  CompletableFuture<FromFunction> handleInternally(ToFunction request) {
+    if (!request.hasInvocation()) {
+      return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
+    }
+    ToFunction.InvocationBatchRequest batchRequest = request.getInvocation();
+    Address self = sdkAddressFromProto(batchRequest.getTarget());
+    StatefulFunctionSpec targetSpec = functionSpecs.get(self.type());
+    if (targetSpec == null) {
+      throw new IllegalStateException("Unknown target type " + self);
+    }
+    Supplier<? extends StatefulFunction> supplier = targetSpec.supplier();
+    if (supplier == null) {
+      throw new NullPointerException("missing function supplier for " + self);
+    }
+    StatefulFunction function = supplier.get();
+    if (function == null) {
+      throw new NullPointerException("supplier for " + self + " supplied NULL function.");
+    }
+    StateValueContexts.ResolutionResult stateResolution =
+        StateValueContexts.resolve(targetSpec.knownValues(), batchRequest.getStateList());
+    if (stateResolution.hasMissingValues()) {
+      // not enough information to compute this batch.
+      FromFunction res = buildIncompleteInvocationResponse(stateResolution.missingValues());
+      return CompletableFuture.completedFuture(res);
+    }
+    final ConcurrentAddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(stateResolution.resolved());
+    return executeBatch(batchRequest, self, storage, function);
+  }
+
+  private CompletableFuture<FromFunction> executeBatch(
+      ToFunction.InvocationBatchRequest inputBatch,
+      Address self,
+      ConcurrentAddressScopedStorage storage,
+      StatefulFunction function) {
+
+    FromFunction.InvocationResponse.Builder responseBuilder =
+        FromFunction.InvocationResponse.newBuilder();
+
+    ConcurrentContext context = new ConcurrentContext(self, responseBuilder, storage);
+
+    CompletableFuture<Void> allDone =
+        applySequentially(
+            inputBatch.getInvocationsList(), invocation -> apply(function, context, invocation));
+
+    return allDone.thenApply(unused -> finalizeResponse(storage, context.finalBuilder()));
+  }
+
+  private static FromFunction buildIncompleteInvocationResponse(List<ValueSpec<?>> missing) {
+    FromFunction.IncompleteInvocationContext.Builder result =
+        FromFunction.IncompleteInvocationContext.newBuilder();
+
+    for (ValueSpec<?> v : missing) {
+      result.addMissingValues(ProtoUtils.protoFromValueSpec(v));
+    }
+
+    return FromFunction.newBuilder().setIncompleteInvocationContext(result).build();
+  }
+
+  private static CompletableFuture<Void> apply(
+      StatefulFunction function, ConcurrentContext context, ToFunction.Invocation invocation)
+      throws Throwable {
+    TypedValue argument = invocation.getArgument();
+    MessageWrapper wrapper = new MessageWrapper(context.self(), argument);
+    context.setCaller(sdkAddressFromProto(invocation.getCaller()));
+    CompletableFuture<Void> future = function.apply(context, wrapper);
+    if (future == null) {
+      throw new IllegalStateException(
+          "User function " + context.self() + " has returned a NULL future.");
+    }
+    return future;
+  }
+
+  private static FromFunction finalizeResponse(
+      ConcurrentAddressScopedStorage storage, FromFunction.InvocationResponse.Builder builder) {
+    storage.addMutations(builder::addStateMutations);
+    return FromFunction.newBuilder().setInvocationResult(builder).build();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
new file mode 100644
index 0000000..bd3a9e1
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+final class MoreFutures {
+
+  @FunctionalInterface
+  public interface Fn<I, O> {
+    O apply(I input) throws Throwable;
+  }
+
+  /**
+   * Apply @fn for each element of @elements sequentially. Subsequent element is handed to fn only
+   * after the previous future has completed.
+   */
+  public static <T> CompletableFuture<Void> applySequentially(
+      Iterable<T> elements, Fn<T, CompletableFuture<Void>> fn) {
+    Objects.requireNonNull(elements);
+    Objects.requireNonNull(fn);
+    return applySequentially(elements.iterator(), fn);
+  }
+
+  private static <T> CompletableFuture<Void> applySequentially(
+      Iterator<T> iterator, Fn<T, CompletableFuture<Void>> fn) {
+    try {
+      while (iterator.hasNext()) {
+        T next = iterator.next();
+        CompletableFuture<Void> future = fn.apply(next);
+        if (!future.isDone()) {
+          return future.thenCompose(ignored -> applySequentially(iterator, fn));
+        }
+        if (future.isCompletedExceptionally()) {
+          return future;
+        }
+      }
+      return CompletableFuture.completedFuture(null);
+    } catch (Throwable t) {
+      return exceptional(t);
+    }
+  }
+
+  static <T> CompletableFuture<T> exceptional(Throwable cause) {
+    CompletableFuture<T> e = new CompletableFuture<>();
+    e.completeExceptionally(cause);
+    return e;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
new file mode 100644
index 0000000..cbeb5c3
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.ExpireMode;
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.newBuilder;
+
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.Expiration;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+final class ProtoUtils {
+  private ProtoUtils() {}
+
+  static Address protoAddressFromSdk(org.apache.flink.statefun.sdk.java.Address address) {
+    return Address.newBuilder()
+        .setNamespace(address.type().namespace())
+        .setType(address.type().name())
+        .setId(address.id())
+        .build();
+  }
+
+  static org.apache.flink.statefun.sdk.java.Address sdkAddressFromProto(Address address) {
+    if (address == null
+        || (address.getNamespace().isEmpty()
+            && address.getType().isEmpty()
+            && address.getId().isEmpty())) {
+      return null;
+    }
+    return new org.apache.flink.statefun.sdk.java.Address(
+        TypeName.typeNameOf(address.getNamespace(), address.getType()), address.getId());
+  }
+
+  static PersistedValueSpec.Builder protoFromValueSpec(ValueSpec<?> valueSpec) {
+    PersistedValueSpec.Builder specBuilder =
+        PersistedValueSpec.newBuilder()
+            .setStateNameBytes(ApiExtension.stateNameByteString(valueSpec))
+            .setTypeTypenameBytes(ApiExtension.typeNameByteString(valueSpec.typeName()));
+
+    if (valueSpec.expiration().mode() == Expiration.Mode.NONE) {
+      return specBuilder;
+    }
+
+    ExpireMode mode =
+        valueSpec.expiration().mode() == Expiration.Mode.AFTER_READ_OR_WRITE
+            ? ExpireMode.AFTER_INVOKE
+            : ExpireMode.AFTER_WRITE;
+    long value = valueSpec.expiration().duration().toMillis();
+
+    specBuilder.setExpirationSpec(newBuilder().setExpireAfterMillis(value).setMode(mode));
+    return specBuilder;
+  }
+
+  static TypedValue getTypedValue(Message message) {
+    if (message instanceof MessageWrapper) {
+      return ((MessageWrapper) message).typedValue();
+    }
+    return TypedValue.newBuilder()
+        .setTypenameBytes(ApiExtension.typeNameByteString(message.valueTypeName()))
+        .setValue(SliceProtobufUtil.asByteString(message.rawValue()))
+        .build();
+  }
+
+  static TypedValue getTypedValue(EgressMessage message) {
+    if (message instanceof EgressMessageWrapper) {
+      return ((EgressMessageWrapper) message).typedValue();
+    }
+    return TypedValue.newBuilder()
+        .setTypenameBytes(ApiExtension.typeNameByteString(message.egressMessageValueType()))
+        .setValue(SliceProtobufUtil.asByteString(message.egressMessageValueBytes()))
+        .build();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
new file mode 100644
index 0000000..ed7783c
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface RequestReplyHandler {
+
+  /**
+   * Handles a {@code Stateful Functions} invocation.
+   *
+   * @param input a {@linkplain Slice} as received from the {@code StateFun} server.
+   * @return a serialized representation of the side-effects to preform by the {@code StateFun}
+   *     server, as the result of this invocation.
+   */
+  CompletableFuture<Slice> handle(Slice input);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
new file mode 100644
index 0000000..c55b582
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KafkaEgressMessage {
+
+  public static Builder forEgress(TypeName targetEgressId) {
+    Objects.requireNonNull(targetEgressId);
+    return new Builder(targetEgressId);
+  }
+
+  public static final class Builder {
+    private static final TypeName KAFKA_PRODUCER_RECORD_TYPENAME =
+        TypeName.typeNameOf(
+            "type.googleapis.com", KafkaProducerRecord.getDescriptor().getFullName());
+
+    private final TypeName targetEgressId;
+    private ByteString targetTopic;
+    private ByteString keyBytes;
+    private ByteString value;
+
+    private Builder(TypeName targetEgressId) {
+      this.targetEgressId = targetEgressId;
+    }
+
+    public Builder withTopic(String topic) {
+      this.targetTopic = ByteString.copyFromUtf8(topic);
+      return this;
+    }
+
+    public Builder withUtf8Key(String key) {
+      Objects.requireNonNull(key);
+      this.keyBytes = ByteString.copyFromUtf8(key);
+      return this;
+    }
+
+    public Builder withKey(byte[] key) {
+      Objects.requireNonNull(key);
+      this.keyBytes = ByteString.copyFrom(key);
+      return this;
+    }
+
+    public Builder withKey(Slice slice) {
+      Objects.requireNonNull(slice);
+      this.keyBytes = SliceProtobufUtil.asByteString(slice);
+      return this;
+    }
+
+    public <T> Builder withKey(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withKey(serializer.serialize(value));
+    }
+
+    public Builder withUtf8Value(String value) {
+      Objects.requireNonNull(value);
+      this.value = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withValue(Slice slice) {
+      Objects.requireNonNull(value);
+      this.value = SliceProtobufUtil.asByteString(slice);
+      return this;
+    }
+
+    public <T> Builder withValue(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withValue(serializer.serialize(value));
+    }
+
+    public Builder withValue(byte[] value) {
+      Objects.requireNonNull(value);
+      this.value = ByteString.copyFrom(value);
+      return this;
+    }
+
+    public EgressMessage build() {
+      if (targetTopic == null) {
+        throw new IllegalStateException("A Kafka record requires a target topic.");
+      }
+      if (value == null) {
+        throw new IllegalStateException("A Kafka record requires value bytes");
+      }
+      KafkaProducerRecord.Builder builder =
+          KafkaProducerRecord.newBuilder().setTopicBytes(targetTopic).setValueBytes(value);
+      if (keyBytes != null) {
+        builder.setKeyBytes(keyBytes);
+      }
+      KafkaProducerRecord record = builder.build();
+      TypedValue typedValue =
+          TypedValue.newBuilder()
+              .setTypenameBytes(ApiExtension.typeNameByteString(KAFKA_PRODUCER_RECORD_TYPENAME))
+              .setValue(record.toByteString())
+              .build();
+
+      return new EgressMessageWrapper(targetEgressId, typedValue);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
new file mode 100644
index 0000000..000bb59
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KinesisEgressRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KinesisEgressMessage {
+
+  public static Builder forEgress(TypeName targetEgressId) {
+    Objects.requireNonNull(targetEgressId);
+    return new Builder(targetEgressId);
+  }
+
+  public static final class Builder {
+    private static final TypeName KINESIS_PRODUCER_RECORD_TYPENAME =
+        TypeName.typeNameOf(
+            "type.googleapis.com", KinesisEgressRecord.getDescriptor().getFullName());
+
+    private final TypeName targetEgressId;
+    private ByteString targetStreamBytes;
+    private ByteString partitionKeyBytes;
+    private ByteString valueBytes;
+    private ByteString explicitHashKey;
+
+    private Builder(TypeName targetEgressId) {
+      this.targetEgressId = targetEgressId;
+    }
+
+    public Builder withStream(String stream) {
+      Objects.requireNonNull(stream);
+      this.targetStreamBytes = ByteString.copyFromUtf8(stream);
+      return this;
+    }
+
+    public Builder withStream(Slice stream) {
+      this.targetStreamBytes = SliceProtobufUtil.asByteString(stream);
+      return this;
+    }
+
+    public Builder withUtf8PartitionKey(String key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = ByteString.copyFromUtf8(key);
+      return this;
+    }
+
+    public Builder withPartitionKey(byte[] key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = ByteString.copyFrom(key);
+      return this;
+    }
+
+    public Builder withPartitionKey(Slice key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = SliceProtobufUtil.asByteString(key);
+      return this;
+    }
+
+    public Builder withUtf8Value(String value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withValue(byte[] value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = ByteString.copyFrom(value);
+      return this;
+    }
+
+    public Builder withValue(Slice value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = SliceProtobufUtil.asByteString(value);
+      return this;
+    }
+
+    public <T> Builder withValue(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withValue(serializer.serialize(value));
+    }
+
+    public Builder withUtf8ExplicitHashKey(String value) {
+      Objects.requireNonNull(value);
+      this.explicitHashKey = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withUtf8ExplicitHashKey(Slice utf8Slice) {
+      Objects.requireNonNull(utf8Slice);
+      this.explicitHashKey = SliceProtobufUtil.asByteString(utf8Slice);
+      return this;
+    }
+
+    public EgressMessage build() {
+      KinesisEgressRecord.Builder builder = KinesisEgressRecord.newBuilder();
+      if (targetStreamBytes == null) {
+        throw new IllegalStateException("Missing destination Kinesis stream");
+      }
+      builder.setStreamBytes(targetStreamBytes);
+      if (partitionKeyBytes == null) {
+        throw new IllegalStateException("Missing partition key");
+      }
+      builder.setPartitionKeyBytes(partitionKeyBytes);
+      if (valueBytes == null) {
+        throw new IllegalStateException("Missing value");
+      }
+      builder.setValueBytes(valueBytes);
+      if (explicitHashKey != null) {
+        builder.setExplicitHashKeyBytes(explicitHashKey);
+      }
+
+      TypedValue typedValue =
+          TypedValue.newBuilder()
+              .setTypenameBytes(ApiExtension.typeNameByteString(KINESIS_PRODUCER_RECORD_TYPENAME))
+              .setValue(builder.build().toByteString())
+              .build();
+
+      return new EgressMessageWrapper(targetEgressId, typedValue);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
new file mode 100644
index 0000000..0b8be7b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface EgressMessage {
+  TypeName targetEgressId();
+
+  TypeName egressMessageValueType();
+
+  Slice egressMessageValueBytes();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
new file mode 100644
index 0000000..9c849a2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class EgressMessageWrapper implements EgressMessage {
+  private final TypedValue typedValue;
+  private final TypeName targetEgressId;
+
+  public EgressMessageWrapper(TypeName targetEgressId, TypedValue actualMessage) {
+    this.targetEgressId = Objects.requireNonNull(targetEgressId);
+    this.typedValue = Objects.requireNonNull(actualMessage);
+  }
+
+  @Override
+  public TypeName targetEgressId() {
+    return targetEgressId;
+  }
+
+  @Override
+  public TypeName egressMessageValueType() {
+    return TypeName.typeNameFromString(typedValue.getTypename());
+  }
+
+  @Override
+  public Slice egressMessageValueBytes() {
+    return SliceProtobufUtil.asSlice(typedValue.getValue());
+  }
+
+  public TypedValue typedValue() {
+    return typedValue;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
new file mode 100644
index 0000000..e2e3467
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+public interface Message {
+  Address targetAddress();
+
+  boolean isLong();
+
+  long asLong();
+
+  boolean isUtf8String();
+
+  String asUtf8String();
+
+  boolean isInt();
+
+  int asInt();
+
+  boolean isBoolean();
+
+  boolean asBoolean();
+
+  boolean isFloat();
+
+  float asFloat();
+
+  boolean isDouble();
+
+  double asDouble();
+
+  <T> boolean is(Type<T> type);
+
+  <T> T as(Type<T> type);
+
+  TypeName valueTypeName();
+
+  Slice rawValue();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
new file mode 100644
index 0000000..cd9f738
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class MessageBuilder {
+  private final TypedValue.Builder builder;
+  private Address targetAddress;
+
+  private MessageBuilder(TypeName functionType, String id) {
+    this(functionType, id, TypedValue.newBuilder());
+  }
+
+  private MessageBuilder(TypeName functionType, String id, TypedValue.Builder builder) {
+    this.targetAddress = new Address(functionType, id);
+    this.builder = Objects.requireNonNull(builder);
+  }
+
+  public static MessageBuilder forAddress(TypeName functionType, String id) {
+    return new MessageBuilder(functionType, id);
+  }
+
+  public static MessageBuilder forAddress(Address address) {
+    Objects.requireNonNull(address);
+    return new MessageBuilder(address.type(), address.id());
+  }
+
+  public static MessageBuilder fromMessage(Message message) {
+    Address targetAddress = message.targetAddress();
+    TypedValue.Builder builder = typedValueBuilder(message);
+    return new MessageBuilder(targetAddress.type(), targetAddress.id(), builder);
+  }
+
+  public MessageBuilder withValue(long value) {
+    return withCustomType(Types.longType(), value);
+  }
+
+  public MessageBuilder withValue(int value) {
+    return withCustomType(Types.integerType(), value);
+  }
+
+  public MessageBuilder withValue(boolean value) {
+    return withCustomType(Types.booleanType(), value);
+  }
+
+  public MessageBuilder withValue(String value) {
+    return withCustomType(Types.stringType(), value);
+  }
+
+  public MessageBuilder withValue(float value) {
+    return withCustomType(Types.floatType(), value);
+  }
+
+  public MessageBuilder withValue(double value) {
+    return withCustomType(Types.doubleType(), value);
+  }
+
+  public MessageBuilder withTargetAddress(Address targetAddress) {
+    this.targetAddress = Objects.requireNonNull(targetAddress);
+    return this;
+  }
+
+  public MessageBuilder withTargetAddress(TypeName typeName, String id) {
+    return withTargetAddress(new Address(typeName, id));
+  }
+
+  public <T> MessageBuilder withCustomType(Type<T> customType, T element) {
+    Objects.requireNonNull(customType);
+    Objects.requireNonNull(element);
+    TypeSerializer<T> typeSerializer = customType.typeSerializer();
+    builder.setTypenameBytes(ApiExtension.typeNameByteString(customType.typeName()));
+    Slice serialized = typeSerializer.serialize(element);
+    ByteString serializedByteString = SliceProtobufUtil.asByteString(serialized);
+    builder.setValue(serializedByteString);
+    return this;
+  }
+
+  public Message build() {
+    return new MessageWrapper(targetAddress, builder.build());
+  }
+
+  private static TypedValue.Builder typedValueBuilder(Message message) {
+    ByteString typenameBytes = ApiExtension.typeNameByteString(message.valueTypeName());
+    ByteString valueBytes = SliceProtobufUtil.asByteString(message.rawValue());
+    return TypedValue.newBuilder().setTypenameBytes(typenameBytes).setValue(valueBytes);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
new file mode 100644
index 0000000..74c5955
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class MessageWrapper implements Message {
+  private final TypedValue typedValue;
+  private final Address targetAddress;
+
+  public MessageWrapper(Address targetAddress, TypedValue typedValue) {
+    this.targetAddress = Objects.requireNonNull(targetAddress);
+    this.typedValue = Objects.requireNonNull(typedValue);
+  }
+
+  @Override
+  public Address targetAddress() {
+    return targetAddress;
+  }
+
+  @Override
+  public boolean isLong() {
+    return is(Types.longType());
+  }
+
+  @Override
+  public long asLong() {
+    return as(Types.longType());
+  }
+
+  @Override
+  public boolean isUtf8String() {
+    return is(Types.stringType());
+  }
+
+  @Override
+  public String asUtf8String() {
+    return as(Types.stringType());
+  }
+
+  @Override
+  public boolean isInt() {
+    return is(Types.integerType());
+  }
+
+  @Override
+  public int asInt() {
+    return as(Types.integerType());
+  }
+
+  @Override
+  public boolean isBoolean() {
+    return is(Types.booleanType());
+  }
+
+  @Override
+  public boolean asBoolean() {
+    return as(Types.booleanType());
+  }
+
+  @Override
+  public boolean isFloat() {
+    return is(Types.floatType());
+  }
+
+  @Override
+  public float asFloat() {
+    return as(Types.floatType());
+  }
+
+  @Override
+  public boolean isDouble() {
+    return is(Types.doubleType());
+  }
+
+  @Override
+  public double asDouble() {
+    return as(Types.doubleType());
+  }
+
+  @Override
+  public <T> boolean is(Type<T> type) {
+    String thisTypeNameString = typedValue.getTypename();
+    String thatTypeNameString = type.typeName().asTypeNameString();
+    return thisTypeNameString.equals(thatTypeNameString);
+  }
+
+  @Override
+  public <T> T as(Type<T> type) {
+    TypeSerializer<T> typeSerializer = type.typeSerializer();
+    Slice input = SliceProtobufUtil.asSlice(typedValue.getValue());
+    return typeSerializer.deserialize(input);
+  }
+
+  @Override
+  public TypeName valueTypeName() {
+    return TypeName.typeNameFromString(typedValue.getTypename());
+  }
+
+  @Override
+  public Slice rawValue() {
+    return SliceProtobufUtil.asSlice(typedValue.getValue());
+  }
+
+  public TypedValue typedValue() {
+    return typedValue;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
new file mode 100644
index 0000000..c28aa58
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+final class ByteStringSlice implements Slice {
+  private final ByteString byteString;
+
+  public ByteStringSlice(ByteString bytes) {
+    this.byteString = Objects.requireNonNull(bytes);
+  }
+
+  public ByteString byteString() {
+    return byteString;
+  }
+
+  @Override
+  public ByteBuffer asReadOnlyByteBuffer() {
+    return byteString.asReadOnlyByteBuffer();
+  }
+
+  @Override
+  public int readableBytes() {
+    return byteString.size();
+  }
+
+  @Override
+  public void copyTo(byte[] target) {
+    copyTo(target, 0);
+  }
+
+  @Override
+  public void copyTo(byte[] target, int targetOffset) {
+    byteString.copyTo(target, targetOffset);
+  }
+
+  @Override
+  public void copyTo(OutputStream outputStream) {
+    try {
+      byteString.writeTo(outputStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public byte byteAt(int position) {
+    return byteString.byteAt(position);
+  }
+
+  @Override
+  public void copyTo(ByteBuffer buffer) {
+    byteString.copyTo(buffer);
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    return byteString.toByteArray();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
new file mode 100644
index 0000000..8c6187e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Slice {
+
+  int readableBytes();
+
+  void copyTo(ByteBuffer target);
+
+  void copyTo(byte[] target);
+
+  void copyTo(byte[] target, int targetOffset);
+
+  void copyTo(OutputStream outputStream);
+
+  byte byteAt(int position);
+
+  ByteBuffer asReadOnlyByteBuffer();
+
+  byte[] toByteArray();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
new file mode 100644
index 0000000..2573e4f
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public final class SliceOutput {
+  private byte[] buf;
+  private int position;
+
+  public static SliceOutput sliceOutput(int initialSize) {
+    return new SliceOutput(initialSize);
+  }
+
+  private SliceOutput(int initialSize) {
+    if (initialSize < 0) {
+      throw new IllegalArgumentException("initial size has to be non negative");
+    }
+    this.buf = new byte[initialSize];
+    this.position = 0;
+  }
+
+  public void write(byte b) {
+    ensureCapacity(1);
+    buf[position] = b;
+    position++;
+  }
+
+  public void write(byte[] buffer) {
+    write(buffer, 0, buffer.length);
+  }
+
+  public void write(byte[] buffer, int offset, int len) {
+    Objects.requireNonNull(buffer);
+    if (offset < 0 || offset > buffer.length) {
+      throw new IllegalArgumentException("Offset out of range " + offset);
+    }
+    if (len < 0) {
+      throw new IllegalArgumentException("Negative length " + len);
+    }
+    ensureCapacity(len);
+    System.arraycopy(buffer, offset, buf, position, len);
+    position += len;
+  }
+
+  public void write(ByteBuffer buffer) {
+    int n = buffer.remaining();
+    ensureCapacity(n);
+    buffer.get(buf, position, n);
+    position += n;
+  }
+
+  public void write(Slice slice) {
+    write(slice.asReadOnlyByteBuffer());
+  }
+
+  public void writeFully(InputStream input) {
+    try {
+      int bytesRead;
+      do {
+        ensureCapacity(256);
+        bytesRead = input.read(buf, position, remaining());
+        position += bytesRead;
+      } while (bytesRead != -1);
+      position++; // compensate for the latest -1 addition.
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public Slice copyOf() {
+    return Slices.copyOf(buf, 0, position);
+  }
+
+  public Slice view() {
+    return Slices.wrap(buf, 0, position);
+  }
+
+  private int remaining() {
+    return buf.length - position;
+  }
+
+  private void ensureCapacity(final int bytesNeeded) {
+    final int requiredNewLength = position + bytesNeeded;
+    if (requiredNewLength >= buf.length) {
+      this.buf = Arrays.copyOf(buf, 2 * requiredNewLength);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
new file mode 100644
index 0000000..2a87b5b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.Parser;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class SliceProtobufUtil {
+  private SliceProtobufUtil() {}
+
+  public static <T> T parseFrom(Parser<T> parser, Slice slice)
+      throws InvalidProtocolBufferException {
+    if (slice instanceof ByteStringSlice) {
+      ByteString byteString = ((ByteStringSlice) slice).byteString();
+      return parser.parseFrom(byteString);
+    }
+    return parser.parseFrom(slice.asReadOnlyByteBuffer());
+  }
+
+  public static Slice toSlice(Message message) {
+    return Slices.wrap(message.toByteArray());
+  }
+
+  public static ByteString asByteString(Slice slice) {
+    if (slice instanceof ByteStringSlice) {
+      ByteStringSlice byteStringSlice = (ByteStringSlice) slice;
+      return byteStringSlice.byteString();
+    }
+    return MoreByteStrings.wrap(slice.asReadOnlyByteBuffer());
+  }
+
+  public static Slice asSlice(ByteString byteString) {
+    return new ByteStringSlice(byteString);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
new file mode 100644
index 0000000..7f42291
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.MoreByteStrings;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public final class Slices {
+  private Slices() {}
+
+  public static Slice wrap(ByteBuffer buffer) {
+    return wrap(MoreByteStrings.wrap(buffer));
+  }
+
+  public static Slice wrap(byte[] bytes) {
+    return wrap(MoreByteStrings.wrap(bytes));
+  }
+
+  private static Slice wrap(ByteString bytes) {
+    return new ByteStringSlice(bytes);
+  }
+
+  public static Slice wrap(byte[] bytes, int offset, int len) {
+    return wrap(MoreByteStrings.wrap(bytes, offset, len));
+  }
+
+  public static Slice copyOf(byte[] bytes) {
+    return wrap(ByteString.copyFrom(bytes));
+  }
+
+  public static Slice copyOf(byte[] bytes, int offset, int len) {
+    return wrap(ByteString.copyFrom(bytes, offset, len));
+  }
+
+  public static Slice copyOf(InputStream inputStream, int expectedStreamSize) {
+    SliceOutput out = SliceOutput.sliceOutput(expectedStreamSize);
+    out.writeFully(inputStream);
+    return out.view();
+  }
+
+  public static Slice copyFromUtf8(String input) {
+    return wrap(ByteString.copyFromUtf8(input));
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
new file mode 100644
index 0000000..d692c13
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.TypeCharacteristics;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class ConcurrentAddressScopedStorage implements AddressScopedStorage {
+
+  private final List<Cell<?>> cells;
+
+  public ConcurrentAddressScopedStorage(List<StateValueContext<?>> stateValues) {
+    this.cells = createCells(stateValues);
+  }
+
+  @Override
+  public <T> Optional<T> get(ValueSpec<T> valueSpec) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    return cell.get();
+  }
+
+  @Override
+  public <T> void set(ValueSpec<T> valueSpec, T value) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    cell.set(value);
+  }
+
+  @Override
+  public <T> void remove(ValueSpec<T> valueSpec) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    cell.remove();
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Cell<T> getCellOrThrow(ValueSpec<T> runtimeSpec) {
+    // fast path: the user used the same ValueSpec reference to declare the function
+    // and to index into the state.
+    for (Cell<?> cell : cells) {
+      ValueSpec<?> registeredSpec = cell.spec();
+      if (runtimeSpec == registeredSpec) {
+        return (Cell<T>) cell;
+      }
+    }
+    return slowGetCellOrThrow(runtimeSpec);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Cell<T> slowGetCellOrThrow(ValueSpec<T> valueSpec) {
+    // unlikely slow path: when the users used a different ValueSpec instance in registration
+    // and at runtime.
+    for (Cell<?> cell : cells) {
+      ValueSpec<?> thisSpec = cell.spec();
+      String thisName = thisSpec.name();
+      if (!thisName.equals(valueSpec.name())) {
+        continue;
+      }
+      if (thisSpec.typeName().equals(valueSpec.typeName())) {
+        return (Cell<T>) cell;
+      }
+      throw new IllegalStorageAccessException(
+          valueSpec.name(),
+          "Accessed state with incorrect type; state type was registered as "
+              + thisSpec.typeName()
+              + ", but was accessed as type "
+              + valueSpec.typeName());
+    }
+    throw new IllegalStorageAccessException(
+        valueSpec.name(), "State does not exist; make sure that this state was registered.");
+  }
+
+  public void addMutations(Consumer<PersistedValueMutation> consumer) {
+    for (Cell<?> cell : cells) {
+      cell.toProtocolValueMutation().ifPresent(consumer);
+    }
+  }
+
+  // ===============================================================================
+  //  Thread-safe state value cells
+  // ===============================================================================
+
+  private interface Cell<T> {
+    Optional<T> get();
+
+    void set(T value);
+
+    void remove();
+
+    Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation();
+
+    ValueSpec<T> spec();
+  }
+
+  private static <T> Optional<T> tryDeserialize(
+      TypeSerializer<T> serializer, TypedValue typedValue) {
+    if (!typedValue.getHasValue()) {
+      return Optional.empty();
+    }
+    Slice slice = SliceProtobufUtil.asSlice(typedValue.getValue());
+    T value = serializer.deserialize(slice);
+    return Optional.ofNullable(value);
+  }
+
+  private static <T> ByteString serialize(TypeSerializer<T> serializer, T value) {
+    Slice slice = serializer.serialize(value);
+    return SliceProtobufUtil.asByteString(slice);
+  }
+
+  private static final class ImmutableTypeCell<T> implements Cell<T> {
+    private final ReentrantLock lock = new ReentrantLock();
+    private final ValueSpec<T> spec;
+    private final TypedValue typedValue;
+    private final TypeSerializer<T> serializer;
+
+    private CellStatus status = CellStatus.UNMODIFIED;
+    private T cachedObject;
+
+    public ImmutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+      this.spec = spec;
+      this.typedValue = typedValue;
+      this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+    }
+
+    @Override
+    public Optional<T> get() {
+      lock.lock();
+      try {
+        if (status == CellStatus.DELETED) {
+          return Optional.empty();
+        }
+        if (cachedObject != null) {
+          return Optional.of(cachedObject);
+        }
+        Optional<T> result = tryDeserialize(serializer, typedValue);
+        result.ifPresent(object -> this.cachedObject = object);
+        return result;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void set(T value) {
+      if (value == null) {
+        throw new IllegalStorageAccessException(
+            spec.name(), "Can not set state to NULL. Please use remove() instead.");
+      }
+      lock.lock();
+      try {
+        cachedObject = value;
+        status = CellStatus.MODIFIED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void remove() {
+      lock.lock();
+      try {
+        cachedObject = null;
+        status = CellStatus.DELETED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public Optional<PersistedValueMutation> toProtocolValueMutation() {
+      final String typeNameString = spec.typeName().asTypeNameString();
+      switch (status) {
+        case MODIFIED:
+          final TypedValue.Builder newValue =
+              TypedValue.newBuilder()
+                  .setTypename(typeNameString)
+                  .setHasValue(true)
+                  .setValue(serialize(serializer, cachedObject));
+
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+                  .setStateValue(newValue)
+                  .build());
+        case DELETED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.DELETE)
+                  .build());
+        case UNMODIFIED:
+          return Optional.empty();
+        default:
+          throw new IllegalStateException("Unknown cell status: " + status);
+      }
+    }
+
+    @Override
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+  }
+
+  private static final class MutableTypeCell<T> implements Cell<T> {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final TypeSerializer<T> serializer;
+    private final ValueSpec<T> spec;
+    private TypedValue typedValue;
+    private CellStatus status = CellStatus.UNMODIFIED;
+
+    private MutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+      this.spec = spec;
+      this.typedValue = typedValue;
+      this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+    }
+
+    @Override
+    public Optional<T> get() {
+      lock.lock();
+      try {
+        if (status == CellStatus.DELETED) {
+          return Optional.empty();
+        }
+        return tryDeserialize(serializer, typedValue);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void set(T value) {
+      if (value == null) {
+        throw new IllegalStorageAccessException(
+            spec.name(), "Can not set state to NULL. Please use remove() instead.");
+      }
+      lock.lock();
+      try {
+        final TypedValue newTypedValue =
+            this.typedValue
+                .toBuilder()
+                .setHasValue(true)
+                .setValue(serialize(serializer, value))
+                .build();
+        this.typedValue = newTypedValue;
+        this.status = CellStatus.MODIFIED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void remove() {
+      lock.lock();
+      try {
+        status = CellStatus.DELETED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public Optional<PersistedValueMutation> toProtocolValueMutation() {
+      switch (status) {
+        case MODIFIED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+                  .setStateValue(typedValue)
+                  .build());
+        case DELETED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.DELETE)
+                  .build());
+        case UNMODIFIED:
+          return Optional.empty();
+        default:
+          throw new IllegalStateException("Unknown cell status: " + status);
+      }
+    }
+
+    @Override
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+  }
+
+  private enum CellStatus {
+    UNMODIFIED,
+    MODIFIED,
+    DELETED
+  }
+
+  private static List<Cell<?>> createCells(List<StateValueContext<?>> stateValues) {
+    final List<Cell<?>> cells = new ArrayList<>(stateValues.size());
+
+    for (StateValueContext<?> stateValueContext : stateValues) {
+      final TypedValue typedValue = stateValueContext.protocolValue().getStateValue();
+      final ValueSpec<?> spec = stateValueContext.spec();
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      final Cell<?> cell =
+          spec.type().typeCharacteristics().contains(TypeCharacteristics.IMMUTABLE_VALUES)
+              ? new ImmutableTypeCell(spec, typedValue)
+              : new MutableTypeCell(spec, typedValue);
+
+      cells.add(cell);
+    }
+
+    return cells;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
new file mode 100644
index 0000000..3a84840
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+public final class IllegalStorageAccessException extends RuntimeException {
+
+  private static final long serialVersionUID = 1;
+
+  protected IllegalStorageAccessException(String stateName, String message) {
+    super("Error accessing state " + stateName + "; " + message);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
new file mode 100644
index 0000000..009c7aa
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+/**
+ * Utility for pairing registered {@link ValueSpec}s with values provided by the protocol's {@link
+ * ToFunction} message.
+ */
+@Internal
+public final class StateValueContexts {
+
+  public static final class StateValueContext<T> {
+    private final ValueSpec<T> spec;
+    private final ToFunction.PersistedValue protocolValue;
+
+    StateValueContext(ValueSpec<T> spec, ToFunction.PersistedValue protocolValue) {
+      this.spec = Objects.requireNonNull(spec);
+      this.protocolValue = Objects.requireNonNull(protocolValue);
+    }
+
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+
+    public ToFunction.PersistedValue protocolValue() {
+      return protocolValue;
+    }
+  }
+
+  public static final class ResolutionResult {
+    private final List<StateValueContext<?>> resolved;
+    private final List<ValueSpec<?>> missingValues;
+
+    private ResolutionResult(
+        List<StateValueContext<?>> resolved, List<ValueSpec<?>> missingValues) {
+      this.resolved = resolved;
+      this.missingValues = missingValues;
+    }
+
+    public boolean hasMissingValues() {
+      return missingValues != null;
+    }
+
+    public List<StateValueContext<?>> resolved() {
+      return resolved;
+    }
+
+    public List<ValueSpec<?>> missingValues() {
+      return missingValues;
+    }
+  }
+
+  /**
+   * Tries to resolve any registered states that are known to us by the {@link ValueSpec} with the
+   * states provided to us by the runtime.
+   */
+  public static ResolutionResult resolve(
+      Map<String, ValueSpec<?>> registeredSpecs,
+      List<ToFunction.PersistedValue> protocolProvidedValues) {
+
+    // holds a set of missing ValueSpec's. a missing ValueSpec is a value spec that was
+    // registered by the user but wasn't sent to the SDK by the runtime.
+    // this can happen upon an initial request.
+    List<ValueSpec<?>> statesWithMissingValue =
+        null; // optimize for normal execution, where states aren't missing.
+
+    // holds the StateValueContext that will be used to serialize and deserialize user state.
+    final List<StateValueContext<?>> resolvedStateValues = new ArrayList<>(registeredSpecs.size());
+
+    for (ValueSpec<?> spec : registeredSpecs.values()) {
+      ToFunction.PersistedValue persistedValue =
+          findPersistedValueByName(protocolProvidedValues, spec.name());
+
+      if (persistedValue != null) {
+        resolvedStateValues.add(new StateValueContext<>(spec, persistedValue));
+      } else {
+        // oh no. the runtime doesn't know (yet) about a state that was registered by the user.
+        // we need to collect these.
+        statesWithMissingValue =
+            (statesWithMissingValue != null)
+                ? statesWithMissingValue
+                : new ArrayList<>(registeredSpecs.size());
+        statesWithMissingValue.add(spec);
+      }
+    }
+
+    if (statesWithMissingValue == null) {
+      return new ResolutionResult(resolvedStateValues, null);
+    }
+    return new ResolutionResult(null, statesWithMissingValue);
+  }
+
+  /**
+   * finds a {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.PersistedValue}
+   * with a given name. The size of the list, in practice is expected to be very small (0-10) items.
+   * just use a plain linear search.
+   */
+  private static ToFunction.PersistedValue findPersistedValueByName(
+      List<ToFunction.PersistedValue> protocolProvidedValues, String name) {
+    for (ToFunction.PersistedValue value : protocolProvidedValues) {
+      if (Objects.equals(value.getStateName(), name)) {
+        return value;
+      }
+    }
+    return null;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
new file mode 100644
index 0000000..71d5488
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+public final class SimpleType<T> implements Type<T> {
+
+  @FunctionalInterface
+  public interface Fn<I, O> {
+    O apply(I input) throws Throwable;
+  }
+
+  public static <T> Type<T> simpleTypeFrom(
+      TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+    return new SimpleType<>(typeName, serialize, deserialize, Collections.emptySet());
+  }
+
+  public static <T> Type<T> simpleImmutableTypeFrom(
+      TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+    return new SimpleType<>(
+        typeName, serialize, deserialize, EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+  }
+
+  private final TypeName typeName;
+  private final TypeSerializer<T> serializer;
+  private final Set<TypeCharacteristics> typeCharacteristics;
+
+  public SimpleType(
+      TypeName typeName,
+      Fn<T, byte[]> serialize,
+      Fn<byte[], T> deserialize,
+      Set<TypeCharacteristics> typeCharacteristics) {
+    this.typeName = Objects.requireNonNull(typeName);
+    this.serializer = new Serializer<>(serialize, deserialize);
+    this.typeCharacteristics = Collections.unmodifiableSet(EnumSet.copyOf(typeCharacteristics));
+  }
+
+  @Override
+  public TypeName typeName() {
+    return typeName;
+  }
+
+  @Override
+  public TypeSerializer<T> typeSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public Set<TypeCharacteristics> typeCharacteristics() {
+    return typeCharacteristics;
+  }
+
+  private static final class Serializer<T> implements TypeSerializer<T> {
+    private final Fn<T, byte[]> serialize;
+    private final Fn<byte[], T> deserialize;
+
+    private Serializer(Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+      this.serialize = Objects.requireNonNull(serialize);
+      this.deserialize = Objects.requireNonNull(deserialize);
+    }
+
+    @Override
+    public Slice serialize(T value) {
+      try {
+        byte[] bytes = serialize.apply(value);
+        return Slices.wrap(bytes);
+      } catch (Throwable throwable) {
+        throw new IllegalStateException(throwable);
+      }
+    }
+
+    @Override
+    public T deserialize(Slice input) {
+      try {
+        byte[] bytes = input.toByteArray();
+        return deserialize.apply(bytes);
+      } catch (Throwable throwable) {
+        throw new IllegalStateException(throwable);
+      }
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
new file mode 100644
index 0000000..7343b5a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+
+public interface Type<T> {
+
+  TypeName typeName();
+
+  TypeSerializer<T> typeSerializer();
+
+  default Set<TypeCharacteristics> typeCharacteristics() {
+    return Collections.emptySet();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
new file mode 100644
index 0000000..c29bde2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+public enum TypeCharacteristics {
+  IMMUTABLE_VALUES
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
new file mode 100644
index 0000000..c60e802
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface TypeSerializer<T> {
+
+  Slice serialize(T value);
+
+  T deserialize(Slice bytes);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
new file mode 100644
index 0000000..daa4fbf
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.parseFrom;
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.toSlice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.WireFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.DoubleWrapper;
+import org.apache.flink.statefun.sdk.types.generated.FloatWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+
+public final class Types {
+  private Types() {}
+
+  // primitives
+  public static final TypeName BOOLEAN_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/bool");
+  public static final TypeName INTEGER_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/int");
+  public static final TypeName FLOAT_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/float");
+  public static final TypeName LONG_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/long");
+  public static final TypeName DOUBLE_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/double");
+  public static final TypeName STRING_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/string");
+
+  // common characteristics
+  private static final Set<TypeCharacteristics> IMMUTABLE_TYPE_CHARS =
+      Collections.unmodifiableSet(EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+
+  public static Type<Long> longType() {
+    return LongType.INSTANCE;
+  }
+
+  public static Type<String> stringType() {
+    return StringType.INSTANCE;
+  }
+
+  public static Type<Integer> integerType() {
+    return IntegerType.INSTANCE;
+  }
+
+  public static Type<Boolean> booleanType() {
+    return BooleanType.INSTANCE;
+  }
+
+  public static Type<Float> floatType() {
+    return FloatType.INSTANCE;
+  }
+
+  public static Type<Double> doubleType() {
+    return DoubleType.INSTANCE;
+  }
+
+  /**
+   * Compute the Protobuf field tag, as specified by the Protobuf wire format. See {@code
+   * WireFormat#makeTag(int, int)}}. NOTE: that, currently, for all StateFun provided wire types the
+   * tags should be 1 byte.
+   *
+   * @param fieldNumber the field number as specified in the message definition.
+   * @param wireType the field type as specified in the message definition.
+   * @return the field tag as a single byte.
+   */
+  private static byte protobufTagAsSingleByte(int fieldNumber, int wireType) {
+    int fieldTag = fieldNumber << 3 | wireType;
+    if (fieldTag < -127 || fieldTag > 127) {
+      throw new IllegalStateException(
+          "Protobuf Wrapper type compatibility is bigger than one byte.");
+    }
+    return (byte) fieldTag;
+  }
+
+  private static final Slice EMPTY_SLICE = Slices.wrap(new byte[] {});
+
+  private static final class LongType implements Type<Long> {
+
+    static final Type<Long> INSTANCE = new LongType();
+
+    private final TypeSerializer<Long> serializer = new LongTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return LONG_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Long> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class LongTypeSerializer implements TypeSerializer<Long> {
+
+    @Override
+    public Slice serialize(Long element) {
+      return serializeLongWrapperCompatibleLong(element);
+    }
+
+    @Override
+    public Long deserialize(Slice input) {
+      return deserializeLongWrapperCompatibleLong(input);
+    }
+
+    private static final byte WRAPPER_TYPE_FIELD_TAG =
+        protobufTagAsSingleByte(LongWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED64);
+
+    private static Slice serializeLongWrapperCompatibleLong(long n) {
+      if (n == 0) {
+        return EMPTY_SLICE;
+      }
+      byte[] out = new byte[9];
+      out[0] = WRAPPER_TYPE_FIELD_TAG;
+      out[1] = (byte) (n & 0xFF);
+      out[2] = (byte) ((n >> 8) & 0xFF);
+      out[3] = (byte) ((n >> 16) & 0xFF);
+      out[4] = (byte) ((n >> 24) & 0xFF);
+      out[5] = (byte) ((n >> 32) & 0xFF);
+      out[6] = (byte) ((n >> 40) & 0xFF);
+      out[7] = (byte) ((n >> 48) & 0xFF);
+      out[8] = (byte) ((n >> 56) & 0xFF);
+      return Slices.wrap(out);
+    }
+
+    private static long deserializeLongWrapperCompatibleLong(Slice slice) {
+      if (slice.readableBytes() == 0) {
+        return 0;
+      }
+      if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TAG) {
+        throw new IllegalStateException("Not a LongWrapper");
+      }
+      return slice.byteAt(1) & 0xFFL
+          | (slice.byteAt(2) & 0xFFL) << 8
+          | (slice.byteAt(3) & 0xFFL) << 16
+          | (slice.byteAt(4) & 0xFFL) << 24
+          | (slice.byteAt(5) & 0xFFL) << 32
+          | (slice.byteAt(6) & 0xFFL) << 40
+          | (slice.byteAt(7) & 0xFFL) << 48
+          | (slice.byteAt(8) & 0xFFL) << 56;
+    }
+  }
+
+  private static final class StringType implements Type<String> {
+
+    static final Type<String> INSTANCE = new StringType();
+
+    private final TypeSerializer<String> serializer = new StringTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return STRING_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<String> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class StringTypeSerializer implements TypeSerializer<String> {
+
+    private static final byte STRING_WRAPPER_FIELD_TYPE =
+        protobufTagAsSingleByte(
+            StringWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+
+    @Override
+    public Slice serialize(String element) {
+      return serializeStringWrapperCompatibleString(element);
+    }
+
+    @Override
+    public String deserialize(Slice input) {
+      return deserializeStringWrapperCompatibleString(input);
+    }
+
+    private static Slice serializeStringWrapperCompatibleString(String element) {
+      if (element.isEmpty()) {
+        return EMPTY_SLICE;
+      }
+      ByteString utf8 = ByteString.copyFromUtf8(element);
+      byte[] header = new byte[1 + 5 + utf8.size()];
+      int position = 0;
+      // write the field tag
+      header[position++] = STRING_WRAPPER_FIELD_TYPE;
+      // write utf8 bytes.length as a VarInt.
+      int varIntLen = utf8.size();
+      while ((varIntLen & -128) != 0) {
+        header[position++] = ((byte) (varIntLen & 127 | 128));
+        varIntLen >>>= 7;
+      }
+      header[position++] = ((byte) varIntLen);
+      // concat header and the utf8 string bytes
+      ByteString headerBuf = MoreByteStrings.wrap(header, 0, position);
+      ByteString result = MoreByteStrings.concat(headerBuf, utf8);
+      return SliceProtobufUtil.asSlice(result);
+    }
+
+    private static String deserializeStringWrapperCompatibleString(Slice input) {
+      if (input.readableBytes() == 0) {
+        return "";
+      }
+      ByteString buf = SliceProtobufUtil.asByteString(input);
+      int position = 0;
+      // read field tag
+      if (buf.byteAt(position++) != STRING_WRAPPER_FIELD_TYPE) {
+        throw new IllegalStateException("Not a StringWrapper");
+      }
+      // read VarInt32 length
+      int shift = 0;
+      long varIntSize = 0;
+      while (shift < 32) {
+        final byte b = buf.byteAt(position++);
+        varIntSize |= (long) (b & 0x7F) << shift;
+        if ((b & 0x80) == 0) {
+          break;
+        }
+        shift += 7;
+      }
+      // sanity checks
+      if (varIntSize < 0 || varIntSize > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Malformed VarInt");
+      }
+      final int endIndex = position + (int) varIntSize;
+      ByteString utf8Bytes = buf.substring(position, endIndex);
+      return utf8Bytes.toStringUtf8();
+    }
+  }
+
+  private static final class IntegerType implements Type<Integer> {
+
+    static final Type<Integer> INSTANCE = new IntegerType();
+
+    private final TypeSerializer<Integer> serializer = new IntegerTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return INTEGER_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Integer> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class IntegerTypeSerializer implements TypeSerializer<Integer> {
+    private static final byte WRAPPER_TYPE_FIELD_TYPE =
+        protobufTagAsSingleByte(IntWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED32);
+
+    @Override
+    public Slice serialize(Integer element) {
+      return serializeIntegerWrapperCompatibleInt(element);
+    }
+
+    @Override
+    public Integer deserialize(Slice input) {
+      return deserializeIntegerWrapperCompatibleInt(input);
+    }
+
+    private static Slice serializeIntegerWrapperCompatibleInt(int n) {
+      if (n == 0) {
+        return EMPTY_SLICE;
+      }
+      byte[] out = new byte[5];
+      out[0] = WRAPPER_TYPE_FIELD_TYPE;
+      out[1] = (byte) (n & 0xFF);
+      out[2] = (byte) ((n >> 8) & 0xFF);
+      out[3] = (byte) ((n >> 16) & 0xFF);
+      out[4] = (byte) ((n >> 24) & 0xFF);
+      return Slices.wrap(out);
+    }
+
+    private static int deserializeIntegerWrapperCompatibleInt(Slice slice) {
+      if (slice.readableBytes() == 0) {
+        return 0;
+      }
+      if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TYPE) {
+        throw new IllegalStateException("Not an IntWrapper");
+      }
+      return slice.byteAt(1) & 0xFF
+          | (slice.byteAt(2) & 0xFF) << 8
+          | (slice.byteAt(3) & 0xFF) << 16
+          | (slice.byteAt(4) & 0xFF) << 24;
+    }
+  }
+
+  private static final class BooleanType implements Type<Boolean> {
+
+    static final Type<Boolean> INSTANCE = new BooleanType();
+
+    private final TypeSerializer<Boolean> serializer = new BooleanTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return BOOLEAN_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Boolean> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class BooleanTypeSerializer implements TypeSerializer<Boolean> {
+    private static final Slice TRUE_SLICE =
+        toSlice(BooleanWrapper.newBuilder().setValue(true).build());
+    private static final Slice FALSE_SLICE =
+        toSlice(BooleanWrapper.newBuilder().setValue(false).build());
+    private static final byte WRAPPER_TYPE_TAG =
+        protobufTagAsSingleByte(BooleanWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_VARINT);
+
+    @Override
+    public Slice serialize(Boolean element) {
+      return element ? TRUE_SLICE : FALSE_SLICE;
+    }
+
+    @Override
+    public Boolean deserialize(Slice input) {
+      if (input.readableBytes() == 0) {
+        return false;
+      }
+      final byte tag = input.byteAt(0);
+      final byte value = input.byteAt(1);
+      if (tag != WRAPPER_TYPE_TAG || value != (byte) 1) {
+        throw new IllegalStateException("Not a BooleanWrapper value.");
+      }
+      return true;
+    }
+  }
+
+  private static final class FloatType implements Type<Float> {
+
+    static final Type<Float> INSTANCE = new FloatType();
+
+    private final TypeSerializer<Float> serializer = new FloatTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return FLOAT_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Float> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class FloatTypeSerializer implements TypeSerializer<Float> {
+
+    @Override
+    public Slice serialize(Float element) {
+      FloatWrapper wrapper = FloatWrapper.newBuilder().setValue(element).build();
+      return toSlice(wrapper);
+    }
+
+    @Override
+    public Float deserialize(Slice input) {
+      try {
+        FloatWrapper wrapper = parseFrom(FloatWrapper.parser(), input);
+        return wrapper.getValue();
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+  }
+
+  private static final class DoubleType implements Type<Double> {
+
+    static final Type<Double> INSTANCE = new DoubleType();
+
+    private final TypeSerializer<Double> serializer = new DoubleTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return DOUBLE_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Double> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class DoubleTypeSerializer implements TypeSerializer<Double> {
+
+    @Override
+    public Slice serialize(Double element) {
+      DoubleWrapper wrapper = DoubleWrapper.newBuilder().setValue(element).build();
+      return toSlice(wrapper);
+    }
+
+    @Override
+    public Double deserialize(Slice input) {
+      try {
+        DoubleWrapper wrapper = parseFrom(DoubleWrapper.parser(), input);
+        return wrapper.getValue();
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
new file mode 100644
index 0000000..4c0a116
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.modifiedValue;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.protoFromValueSpec;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.handler.TestUtils.RequestBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.junit.Test;
+
+public class ConcurrentRequestReplyHandlerTest {
+
+  private static final TypeName GREETER_TYPE = TypeName.typeNameFromString("example/greeter");
+
+  private static final ValueSpec<Integer> SEEN_INT_SPEC =
+      ValueSpec.named("seen").thatExpiresAfterReadOrWrite(Duration.ofDays(1)).withIntType();
+
+  private static final StatefulFunctionSpec GREET_FN_SPEC =
+      StatefulFunctionSpec.builder(GREETER_TYPE)
+          .withValueSpec(SEEN_INT_SPEC)
+          .withSupplier(SimpleGreeter::new)
+          .build();
+
+  private final ConcurrentRequestReplyHandler handlerUnderTest =
+      new ConcurrentRequestReplyHandler(singletonMap(GREETER_TYPE, GREET_FN_SPEC));
+
+  @Test
+  public void simpleInvocationExample() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withState(SEEN_INT_SPEC, 1023)
+            .withInvocation(Types.stringType(), "Hello world")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(response, notNullValue());
+  }
+
+  @Test
+  public void invocationWithoutStateDefinition() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withInvocation(Types.stringType(), "Hello world")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(
+        response.getIncompleteInvocationContext().getMissingValuesList(),
+        hasItem(protoFromValueSpec(SEEN_INT_SPEC)));
+  }
+
+  @Test
+  public void multipleInvocations() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withState(SEEN_INT_SPEC, 0)
+            .withInvocation(Types.stringType(), "a")
+            .withInvocation(Types.stringType(), "b")
+            .withInvocation(Types.stringType(), "c")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(
+        response.getInvocationResult().getStateMutationsList(),
+        hasItem(modifiedValue(SEEN_INT_SPEC, 3)));
+  }
+
+  private static final class SimpleGreeter implements StatefulFunction {
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message argument) {
+      int seen = context.storage().get(SEEN_INT_SPEC).orElse(0);
+      System.out.println("Hello there " + argument.asUtf8String() + " state=" + seen);
+
+      context.storage().set(SEEN_INT_SPEC, seen + 1);
+
+      return CompletableFuture.completedFuture(null);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
new file mode 100644
index 0000000..d861d5a
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.IntStream;
+import org.junit.Test;
+
+public class MoreFuturesTest {
+
+  @Test
+  public void usageExample() {
+    final ConcurrentLinkedDeque<String> results = new ConcurrentLinkedDeque<>();
+
+    CompletableFuture<?> allDone =
+        applySequentially(
+            Arrays.asList("a", "b", "c"),
+            letter -> CompletableFuture.runAsync(() -> results.addLast(letter)));
+
+    allDone.join();
+
+    assertThat(results, contains("a", "b", "c"));
+  }
+
+  @Test
+  public void firstThrows() {
+    CompletableFuture<Void> allDone =
+        applySequentially(Arrays.asList("a", "b", "c"), throwing("a"));
+
+    assertThat(allDone.isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void lastThrows() {
+    CompletableFuture<Void> allDone =
+        applySequentially(Arrays.asList("a", "b", "c"), throwing("c"));
+
+    assertThat(allDone.isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void bigList() {
+    Iterator<String> lotsOfInts =
+        IntStream.range(0, 1_000_000).mapToObj(String::valueOf).iterator();
+    Iterable<String> input = () -> lotsOfInts;
+
+    CompletableFuture<Void> allDone = applySequentially(input, throwing(null));
+
+    allDone.join();
+  }
+
+  private static MoreFutures.Fn<String, CompletableFuture<Void>> throwing(String when) {
+    return letter -> {
+      if (letter.equals(when)) {
+        throw new IllegalStateException("I'm a throwing function");
+      } else {
+        return CompletableFuture.completedFuture(null);
+        //        return CompletableFuture.runAsync(
+        //            () -> {
+        ////              try {
+        ////                Thread.sleep(1_00);
+        ////              } catch (InterruptedException e) {
+        ////                e.printStackTrace();
+        ////              }
+        //              /* complete successfully */
+        //            });
+      }
+    };
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
new file mode 100644
index 0000000..43cb4f1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+
+public class TestUtils {
+  private TestUtils() {}
+
+  public static <T> Matcher<FromFunction.PersistedValueMutation> modifiedValue(
+      ValueSpec<T> spec, T newValue) {
+    FromFunction.PersistedValueMutation mutation =
+        FromFunction.PersistedValueMutation.newBuilder()
+            .setStateName(spec.name())
+            .setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY)
+            .setStateValue(typedValue(spec.type(), newValue))
+            .build();
+
+    return Matchers.is(mutation);
+  }
+
+  public static FromFunction.PersistedValueSpec protoFromValueSpec(ValueSpec<?> spec) {
+    return ProtoUtils.protoFromValueSpec(spec).build();
+  }
+
+  public static <T> TypedValue.Builder typedValue(Type<T> type, T value) {
+    Slice serializedValue = type.typeSerializer().serialize(value);
+    return TypedValue.newBuilder()
+        .setTypename(type.typeName().asTypeNameString())
+        .setHasValue(value != null)
+        .setValue(SliceProtobufUtil.asByteString(serializedValue));
+  }
+
+  /** A test utility to build ToFunction messages using SDK concepts. */
+  public static final class RequestBuilder {
+
+    private final ToFunction.InvocationBatchRequest.Builder builder =
+        ToFunction.InvocationBatchRequest.newBuilder();
+
+    public RequestBuilder withTarget(TypeName target, String id) {
+      builder.setTarget(
+          Address.newBuilder().setNamespace(target.namespace()).setType(target.name()).setId(id));
+      return this;
+    }
+
+    public <T> RequestBuilder withState(ValueSpec<T> spec) {
+      builder.addState(ToFunction.PersistedValue.newBuilder().setStateName(spec.name()));
+
+      return this;
+    }
+
+    public <T> RequestBuilder withState(ValueSpec<T> spec, T value) {
+      builder.addState(
+          ToFunction.PersistedValue.newBuilder()
+              .setStateName(spec.name())
+              .setStateValue(typedValue(spec.type(), value)));
+
+      return this;
+    }
+
+    public <T> RequestBuilder withInvocation(Type<T> type, T value) {
+      builder.addInvocations(
+          ToFunction.Invocation.newBuilder().setArgument(typedValue(type, value)));
+      return this;
+    }
+
+    public ToFunction build() {
+      return ToFunction.newBuilder().setInvocation(builder).build();
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
new file mode 100644
index 0000000..f0ba3c4
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+public class SliceOutputTest {
+
+  private static final byte[] TEST_BYTES = new byte[] {1, 2, 3, 4, 5};
+
+  @Test
+  public void usageExample() {
+    SliceOutput output = SliceOutput.sliceOutput(32);
+
+    output.write(TEST_BYTES);
+    Slice slice = output.copyOf();
+
+    assertArrayEquals(TEST_BYTES, slice.toByteArray());
+  }
+
+  @Test
+  public void sliceShouldAutoGrow() {
+    SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+
+    sliceOutput.write(TEST_BYTES);
+    Slice slice = sliceOutput.copyOf();
+    byte[] got = new byte[slice.readableBytes()];
+    slice.copyTo(got);
+
+    assertArrayEquals(got, TEST_BYTES);
+  }
+
+  @Test
+  public void writeStartingAtOffset() {
+    SliceOutput output = SliceOutput.sliceOutput(32);
+
+    output.write(TEST_BYTES);
+    Slice slice = output.copyOf();
+
+    byte[] slightlyBiggerBuf = new byte[1 + slice.readableBytes()];
+    slice.copyTo(slightlyBiggerBuf, 1);
+
+    byte[] got = deleteFirstByte(slightlyBiggerBuf);
+    assertArrayEquals(TEST_BYTES, got);
+  }
+
+  @Test
+  public void randomizedTest() {
+    ThreadLocalRandom random = ThreadLocalRandom.current();
+    for (int i = 0; i < 1_000_000; i++) {
+      // create a random buffer of a random size.
+      final int size = random.nextInt(0, 32);
+      byte[] buf = randomBuffer(random, size);
+      // write the buffer in random chunks
+      SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+      for (OffsetLenPair chunk : randomChunks(random, size)) {
+        sliceOutput.write(buf, chunk.offset, chunk.len);
+      }
+      // verify
+      Slice slice = sliceOutput.copyOf();
+      assertArrayEquals(buf, slice.toByteArray());
+    }
+  }
+
+  @Test
+  public void copyFromInputStreamRandomizedTest() {
+    byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+    ByteArrayInputStream input = new ByteArrayInputStream(expected);
+
+    Slice slice = Slices.copyOf(input, 0);
+
+    byte[] actual = slice.toByteArray();
+
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void toOutputStreamTest() {
+    byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+
+    Slice slice = Slices.wrap(expected);
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream(expected.length);
+    slice.copyTo(output);
+
+    assertArrayEquals(expected, output.toByteArray());
+  }
+
+  private static byte[] deleteFirstByte(byte[] slightlyBiggerBuf) {
+    return Arrays.copyOfRange(slightlyBiggerBuf, 1, slightlyBiggerBuf.length);
+  }
+
+  private static byte[] randomBuffer(ThreadLocalRandom random, int size) {
+    byte[] buf = new byte[size];
+    random.nextBytes(buf);
+    return buf;
+  }
+
+  /** Compute a random partition of @size to pairs of (offset, len). */
+  private static List<OffsetLenPair> randomChunks(ThreadLocalRandom random, int size) {
+    ArrayList<OffsetLenPair> chunks = new ArrayList<>();
+    int offset = 0;
+    while (offset != size) {
+      int remaining = size - offset;
+      int toWrite = random.nextInt(remaining + 1);
+      chunks.add(new OffsetLenPair(offset, toWrite));
+      offset += toWrite;
+    }
+    return chunks;
+  }
+
+  private static final class OffsetLenPair {
+    final int offset;
+    final int len;
+
+    public OffsetLenPair(int offset, int len) {
+      this.offset = offset;
+      this.len = len;
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
new file mode 100644
index 0000000..85f21c2
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertSame;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+public class SliceProtobufUtilTest {
+
+  @Test
+  public void usageExample() {
+    ByteString expected = ByteString.copyFromUtf8("Hello world");
+
+    Slice slice = SliceProtobufUtil.asSlice(expected);
+    ByteString got = SliceProtobufUtil.asByteString(slice);
+
+    assertSame("Expecting the same reference.", expected, got);
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
new file mode 100644
index 0000000..0afaa02
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.junit.Test;
+
+public class ConcurrentAddressScopedStorageTest {
+
+  @Test
+  public void exampleUsage() {
+    final ValueSpec<Integer> stateSpec1 = ValueSpec.named("state-1").withIntType();
+    final ValueSpec<Boolean> stateSpec2 = ValueSpec.named("state-2").withBooleanType();
+
+    final List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec1, 91), stateValue(stateSpec2, true));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    assertThat(storage.get(stateSpec1), is(Optional.of(91)));
+    assertThat(storage.get(stateSpec2), is(Optional.of(true)));
+  }
+
+  @Test
+  public void getNullValueCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, null));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test
+  public void setCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(stateSpec, 1111);
+
+    assertThat(storage.get(stateSpec), is(Optional.of(1111)));
+  }
+
+  @Test
+  public void setMutableTypeCell() {
+    final ValueSpec<TestMutableType.Type> stateSpec =
+        ValueSpec.named("state").withCustomType(new TestMutableType());
+
+    final List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+    final TestMutableType.Type newValue = new TestMutableType.Type("hello again!");
+    storage.set(stateSpec, newValue);
+
+    // mutations after a set should not have any effect
+    newValue.mutate("this value should not be written to storage!");
+    assertThat(storage.get(stateSpec), is(Optional.of(new TestMutableType.Type("hello again!"))));
+  }
+
+  @Test
+  public void clearCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(stateSpec);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test
+  public void clearMutableTypeCell() {
+    final ValueSpec<TestMutableType.Type> stateSpec =
+        ValueSpec.named("state").withCustomType(new TestMutableType());
+
+    List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(stateSpec);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void getNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.get(ValueSpec.named("doesn't-exist").withIntType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.set(ValueSpec.named("doesn't-exist").withIntType(), 999);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void clearNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.remove(ValueSpec.named("doesn't-exist").withIntType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setToNull() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(stateSpec, null);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void getWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.get(ValueSpec.named("state").withBooleanType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(ValueSpec.named("state").withBooleanType(), true);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void clearWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(ValueSpec.named("state").withBooleanType());
+  }
+
+  private static List<StateValueContext<?>> testStateValues(StateValueContext<?>... testValues) {
+    return Arrays.asList(testValues);
+  }
+
+  private static <T> StateValueContext<T> stateValue(ValueSpec<T> spec, T value) {
+    final ToFunction.PersistedValue protocolValue =
+        ToFunction.PersistedValue.newBuilder()
+            .setStateName(spec.name())
+            .setStateValue(
+                TypedValue.newBuilder()
+                    .setTypename(spec.type().typeName().asTypeNameString())
+                    .setHasValue(value != null)
+                    .setValue(toByteString(spec.type(), value)))
+            .build();
+
+    return new StateValueContext<>(spec, protocolValue);
+  }
+
+  private static <T> ByteString toByteString(Type<T> type, T value) {
+    if (value == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
new file mode 100644
index 0000000..a9c3902
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+public final class StateValueContextsTest {
+
+  @Test
+  public void exampleUsage() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(2);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+    registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(2);
+    providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+    final List<StateValueContext<?>> resolvedStateValues =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+    assertThat(resolvedStateValues.size(), is(2));
+    assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-1")));
+    assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-2")));
+  }
+
+  @Test
+  public void missingProtocolValues() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(3);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+    registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+    registeredSpecs.put("state-3", ValueSpec.named("state-3").withUtf8String());
+
+    // only value for state-2 was provided
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(1);
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+    final List<ValueSpec<?>> statesWithMissingValue =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).missingValues();
+
+    assertThat(statesWithMissingValue.size(), is(2));
+    assertThat(statesWithMissingValue, hasItem(valueSpec("state-1", Types.integerType())));
+    assertThat(statesWithMissingValue, hasItem(valueSpec("state-3", Types.stringType())));
+  }
+
+  @Test
+  public void extraProtocolValues() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(1);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+
+    // a few extra states were provided, and should be ignored
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(3);
+    providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+    providedProtocolValues.add(protocolValue("state-3", Types.stringType(), "ignore me!"));
+
+    final List<StateValueContext<?>> resolvedStateValues =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+    assertThat(resolvedStateValues.size(), is(1));
+    ValueSpec<?> spec = resolvedStateValues.get(0).spec();
+    assertThat(spec.name(), Matchers.is("state-1"));
+  }
+
+  private static <T> ToFunction.PersistedValue protocolValue(
+      String stateName, Type<T> type, T value) {
+    return ToFunction.PersistedValue.newBuilder()
+        .setStateName(stateName)
+        .setStateValue(
+            TypedValue.newBuilder()
+                .setTypename(type.typeName().asTypeNameString())
+                .setHasValue(value != null)
+                .setValue(toByteString(type, value)))
+        .build();
+  }
+
+  private static <T> ByteString toByteString(Type<T> type, T value) {
+    if (value == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+  }
+
+  private static <T> Matcher<ValueSpec<T>> valueSpec(String stateName, Type<T> type) {
+    return new TypeSafeMatcher<ValueSpec<T>>() {
+      @Override
+      protected boolean matchesSafely(ValueSpec<T> testSpec) {
+        return testSpec.type().getClass() == type.getClass() && testSpec.name().equals(stateName);
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
+  private static <T> Matcher<StateValueContext<T>> stateValueContextNamed(String name) {
+    return new TypeSafeDiagnosingMatcher<StateValueContext<T>>() {
+      @Override
+      protected boolean matchesSafely(StateValueContext<T> ctx, Description description) {
+        if (!Objects.equals(ctx.spec().name(), name)) {
+          description.appendText(ctx.spec().name());
+          return false;
+        }
+        return true;
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("A StateValueContext named ").appendText(name);
+      }
+    };
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
new file mode 100644
index 0000000..6c31b00
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+
+public class TestMutableType implements Type<TestMutableType.Type> {
+
+  @Override
+  public TypeName typeName() {
+    return TypeName.typeNameOf("test", "my-mutable-type");
+  }
+
+  @Override
+  public TypeSerializer<TestMutableType.Type> typeSerializer() {
+    return new Serializer();
+  }
+
+  public static class Type {
+    private String value;
+
+    public Type(String value) {
+      this.value = value;
+    }
+
+    public void mutate(String newValue) {
+      this.value = newValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Type type = (Type) o;
+      return Objects.equals(value, type.value);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(value);
+    }
+  }
+
+  private static class Serializer implements TypeSerializer<TestMutableType.Type> {
+    @Override
+    public Slice serialize(Type value) {
+      return Slices.wrap(value.value.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Type deserialize(Slice bytes) {
+      return new Type(new String(bytes.toByteArray(), StandardCharsets.UTF_8));
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
new file mode 100644
index 0000000..90f4ca1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class SanityPrimitiveTypeTest {
+
+  @Test
+  public void testBoolean() {
+    assertRoundTrip(Types.booleanType(), Boolean.TRUE);
+    assertRoundTrip(Types.booleanType(), Boolean.FALSE);
+  }
+
+  @Test
+  public void testInt() {
+    assertRoundTrip(Types.integerType(), 1);
+    assertRoundTrip(Types.integerType(), 1048576);
+    assertRoundTrip(Types.integerType(), Integer.MIN_VALUE);
+    assertRoundTrip(Types.integerType(), Integer.MAX_VALUE);
+    assertRoundTrip(Types.integerType(), -1);
+  }
+
+  @Test
+  public void testLong() {
+    assertRoundTrip(Types.longType(), -1L);
+    assertRoundTrip(Types.longType(), 0L);
+    assertRoundTrip(Types.longType(), Long.MIN_VALUE);
+    assertRoundTrip(Types.longType(), Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testFloat() {
+    assertRoundTrip(Types.floatType(), Float.MIN_VALUE);
+    assertRoundTrip(Types.floatType(), Float.MAX_VALUE);
+    assertRoundTrip(Types.floatType(), 2.1459f);
+    assertRoundTrip(Types.floatType(), -1e-4f);
+  }
+
+  @Test
+  public void testDouble() {
+    assertRoundTrip(Types.doubleType(), Double.MIN_VALUE);
+    assertRoundTrip(Types.doubleType(), Double.MAX_VALUE);
+    assertRoundTrip(Types.doubleType(), 2.1459d);
+    assertRoundTrip(Types.doubleType(), -1e-4d);
+  }
+
+  @Test
+  public void testString() {
+    assertRoundTrip(Types.stringType(), "");
+    assertRoundTrip(Types.stringType(), "This is a string");
+  }
+
+  @Test
+  public void testRandomCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+    for (int i = 0; i < 1_000_000; i++) {
+      testCompatibilityWithWrapper(
+          serializer, IntWrapper::parseFrom, IntWrapper::getValue, IntWrapper::toByteArray, i);
+    }
+  }
+
+  @Test
+  public void testCompatibilityWithABooleanWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Boolean> serializer = Types.booleanType().typeSerializer();
+    testCompatibilityWithWrapper(
+        serializer,
+        BooleanWrapper::parseFrom,
+        BooleanWrapper::getValue,
+        BooleanWrapper::toByteArray,
+        true);
+
+    testCompatibilityWithWrapper(
+        serializer,
+        BooleanWrapper::parseFrom,
+        BooleanWrapper::getValue,
+        BooleanWrapper::toByteArray,
+        false);
+  }
+
+  @Test
+  public void testRandomCompatibilityWithALongWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Long> serializer = Types.longType().typeSerializer();
+    for (long i = 0; i < 1_000_000; i++) {
+      testCompatibilityWithWrapper(
+          serializer, LongWrapper::parseFrom, LongWrapper::getValue, LongWrapper::toByteArray, i);
+    }
+  }
+
+  @Ignore
+  @Test
+  public void testCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+    for (int expected = Integer.MIN_VALUE; expected != Integer.MAX_VALUE; expected++) {
+      testCompatibilityWithWrapper(
+          serializer,
+          IntWrapper::parseFrom,
+          IntWrapper::getValue,
+          IntWrapper::toByteArray,
+          expected);
+    }
+  }
+
+  @Test
+  public void testRandomCompatibilityWithStringWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<String> serializer = Types.stringType().typeSerializer();
+    ThreadLocalRandom random = ThreadLocalRandom.current();
+    for (int i = 0; i < 1_000; i++) {
+      int n = random.nextInt(4096);
+      byte[] buf = new byte[n];
+      random.nextBytes(buf);
+      String expected = new String(buf, StandardCharsets.UTF_8);
+
+      testCompatibilityWithWrapper(
+          serializer,
+          StringWrapper::parseFrom,
+          StringWrapper::getValue,
+          StringWrapper::toByteArray,
+          expected);
+    }
+  }
+
+  @FunctionalInterface
+  interface Fn<I, O> {
+    O apply(I input) throws InvalidProtocolBufferException;
+  }
+
+  private static <T, W> void testCompatibilityWithWrapper(
+      TypeSerializer<T> serializer,
+      Fn<ByteBuffer, W> parseFrom,
+      Fn<W, T> getValue,
+      Fn<W, byte[]> toByteArray,
+      T expected)
+      throws InvalidProtocolBufferException {
+    //
+    // test round trip with ourself.
+    //
+    final Slice serialized = serializer.serialize(expected);
+    final T got = serializer.deserialize(serialized);
+    assertEquals(expected, got);
+    //
+    // test that protobuf can parse what we wrote:
+    //
+    final W wrapper = parseFrom.apply(serialized.asReadOnlyByteBuffer());
+    assertEquals(expected, getValue.apply(wrapper));
+    //
+    // test that we can parse what protobuf wrote:
+    //
+    final Slice serializedByPb = Slices.wrap(toByteArray.apply(wrapper));
+    final T gotPb = serializer.deserialize(serializedByPb);
+    assertEquals(gotPb, expected);
+    // test that pb byte representation is equal to ours:
+    assertEquals(serializedByPb.asReadOnlyByteBuffer(), serialized.asReadOnlyByteBuffer());
+  }
+
+  public <T> void assertRoundTrip(Type<T> type, T element) {
+    final Slice slice;
+    {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      slice = serializer.serialize(element);
+    }
+    TypeSerializer<T> serializer = type.typeSerializer();
+    T got = serializer.deserialize(slice);
+    assertEquals(element, got);
+  }
+}