You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tz...@apache.org on 2021/02/24 11:30:23 UTC

[flink-statefun] 01/01: [FLINK-21459] Implement remote Java SDK for Stateful Functions

This is an automated email from the ASF dual-hosted git repository.

tzulitai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 20b521e5c131c914617008033bd4c63fba81fe97
Author: Igal Shilman <ig...@gmail.com>
AuthorDate: Tue Feb 23 18:09:36 2021 +0800

    [FLINK-21459] Implement remote Java SDK for Stateful Functions
    
    Add the initial implementation of the Java SDK on top of the new remote invocation protocol.
    
    Co-authored-by: Tzu-Li (Gordon) Tai <tz...@apache.org>
    
    This closes #201.
---
 statefun-sdk-java/pom.xml                          |  19 +-
 .../java/com/google/protobuf/MoreByteStrings.java  |  39 ++
 .../apache/flink/statefun/sdk/java/Address.java    |  85 ++++
 .../statefun/sdk/java/AddressScopedStorage.java    |  28 ++
 .../flink/statefun/sdk/java/ApiExtension.java      |  34 ++
 .../apache/flink/statefun/sdk/java/Context.java    |  43 ++
 .../apache/flink/statefun/sdk/java/Expiration.java |  92 +++++
 .../flink/statefun/sdk/java/StatefulFunction.java  |  26 ++
 .../statefun/sdk/java/StatefulFunctionSpec.java    |  78 ++++
 .../flink/statefun/sdk/java/StatefulFunctions.java |  46 +++
 .../apache/flink/statefun/sdk/java/TypeName.java   | 137 +++++++
 .../apache/flink/statefun/sdk/java/ValueSpec.java  | 115 ++++++
 .../statefun/sdk/java/annotations/Internal.java    |  30 ++
 .../sdk/java/handler/ConcurrentContext.java        | 147 +++++++
 .../handler/ConcurrentRequestReplyHandler.java     | 150 +++++++
 .../statefun/sdk/java/handler/MoreFutures.java     |  66 +++
 .../statefun/sdk/java/handler/ProtoUtils.java      |  97 +++++
 .../sdk/java/handler/RequestReplyHandler.java      |  34 ++
 .../statefun/sdk/java/io/KafkaEgressMessage.java   | 127 ++++++
 .../statefun/sdk/java/io/KinesisEgressMessage.java | 146 +++++++
 .../statefun/sdk/java/message/EgressMessage.java   |  30 ++
 .../sdk/java/message/EgressMessageWrapper.java     |  55 +++
 .../flink/statefun/sdk/java/message/Message.java   |  60 +++
 .../statefun/sdk/java/message/MessageBuilder.java  | 113 +++++
 .../statefun/sdk/java/message/MessageWrapper.java  | 133 ++++++
 .../statefun/sdk/java/slice/ByteStringSlice.java   |  80 ++++
 .../flink/statefun/sdk/java/slice/Slice.java       |  40 ++
 .../flink/statefun/sdk/java/slice/SliceOutput.java | 108 +++++
 .../statefun/sdk/java/slice/SliceProtobufUtil.java |  55 +++
 .../flink/statefun/sdk/java/slice/Slices.java      |  61 +++
 .../storage/ConcurrentAddressScopedStorage.java    | 347 ++++++++++++++++
 .../storage/IllegalStorageAccessException.java     |  28 ++
 .../sdk/java/storage/StateValueContexts.java       | 131 ++++++
 .../flink/statefun/sdk/java/types/SimpleType.java  | 104 +++++
 .../apache/flink/statefun/sdk/java/types/Type.java |  33 ++
 .../sdk/java/types/TypeCharacteristics.java        |  22 +
 .../statefun/sdk/java/types/TypeSerializer.java    |  27 ++
 .../flink/statefun/sdk/java/types/Types.java       | 456 +++++++++++++++++++++
 .../handler/ConcurrentRequestReplyHandlerTest.java | 116 ++++++
 .../statefun/sdk/java/handler/MoreFuturesTest.java |  93 +++++
 .../flink/statefun/sdk/java/handler/TestUtils.java |  96 +++++
 .../statefun/sdk/java/slice/SliceOutputTest.java   | 144 +++++++
 .../sdk/java/slice/SliceProtobufUtilTest.java      |  36 ++
 .../ConcurrentAddressScopedStorageTest.java        | 206 ++++++++++
 .../sdk/java/storage/StateValueContextsTest.java   | 150 +++++++
 .../statefun/sdk/java/storage/TestMutableType.java |  77 ++++
 .../sdk/java/types/SanityPrimitiveTypeTest.java    | 194 +++++++++
 47 files changed, 4529 insertions(+), 5 deletions(-)

diff --git a/statefun-sdk-java/pom.xml b/statefun-sdk-java/pom.xml
index 62bb8de..f58829a 100644
--- a/statefun-sdk-java/pom.xml
+++ b/statefun-sdk-java/pom.xml
@@ -24,13 +24,10 @@ under the License.
         <version>2.3-SNAPSHOT</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-
     <artifactId>statefun-sdk-java</artifactId>
-
     <properties>
         <additional-sources.dir>target/additional-sources</additional-sources.dir>
     </properties>
-
     <dependencies>
         <dependency>
             <groupId>org.apache.flink</groupId>
@@ -42,8 +39,20 @@ under the License.
             <artifactId>protobuf-java</artifactId>
             <version>${protobuf.version}</version>
         </dependency>
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
-
     <build>
         <plugins>
             <!--
@@ -145,4 +154,4 @@ under the License.
             </plugin>
         </plugins>
     </build>
-</project>
+</project>
\ No newline at end of file
diff --git a/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
new file mode 100644
index 0000000..9c4269e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/com/google/protobuf/MoreByteStrings.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.protobuf;
+
+import java.nio.ByteBuffer;
+
+public class MoreByteStrings {
+
+  public static ByteString wrap(byte[] bytes) {
+    return ByteString.wrap(bytes);
+  }
+
+  public static ByteString wrap(byte[] bytes, int offset, int len) {
+    return ByteString.wrap(bytes, offset, len);
+  }
+
+  public static ByteString wrap(ByteBuffer buffer) {
+    return ByteString.wrap(buffer);
+  }
+
+  public static ByteString concat(ByteString first, ByteString second) {
+    return first.concat(second);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
new file mode 100644
index 0000000..86dbe76
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Address.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Objects;
+
+/**
+ * An {@link Address} is the unique identity of an individual {@link StatefulFunction}, containing
+ * of the function's {@link TypeName} and an unique identifier within the type. The function's type
+ * denotes the class of function to invoke, while the unique identifier addresses the invocation to
+ * a specific function instance.
+ */
+public final class Address {
+  private final TypeName type;
+  private final String id;
+
+  /**
+   * Creates an {@link Address}.
+   *
+   * @param type type of the function.
+   * @param id unique id within the function type.
+   */
+  public Address(TypeName type, String id) {
+    this.type = Objects.requireNonNull(type);
+    this.id = Objects.requireNonNull(id);
+  }
+
+  /**
+   * Returns the {@link TypeName} that this address identifies.
+   *
+   * @return type of the function
+   */
+  public TypeName type() {
+    return type;
+  }
+
+  /**
+   * Returns the unique function id, within its type, that this address identifies.
+   *
+   * @return unique id within the function type.
+   */
+  public String id() {
+    return id;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Address address = (Address) o;
+    return type.equals(address.type) && id.equals(address.id);
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    hash = 37 * hash + type.hashCode();
+    hash = 37 * hash + id.hashCode();
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Address(%s, %s, %s)", type.namespace(), type.name(), id);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
new file mode 100644
index 0000000..d04310a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/AddressScopedStorage.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.Optional;
+
+public interface AddressScopedStorage {
+  <T> Optional<T> get(ValueSpec<T> descriptor);
+
+  <T> void set(ValueSpec<T> key, T value);
+
+  <T> void remove(ValueSpec<T> key);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
new file mode 100644
index 0000000..2d4ebe4
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ApiExtension.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class ApiExtension {
+
+  public static ByteString typeNameByteString(TypeName typeName) {
+    return typeName.typeNameByteString();
+  }
+
+  public static ByteString stateNameByteString(ValueSpec<?> spec) {
+    return spec.nameByteString();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
new file mode 100644
index 0000000..9bc011b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Context.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.time.Duration;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface Context {
+
+  Address self();
+
+  Optional<Address> caller();
+
+  void send(Message message);
+
+  void sendAfter(Duration duration, Message message);
+
+  void send(EgressMessage message);
+
+  AddressScopedStorage storage();
+
+  default CompletableFuture<Void> done() {
+    return CompletableFuture.completedFuture(null);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
new file mode 100644
index 0000000..060c7ab
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/Expiration.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.util.Objects;
+
+/**
+ * State Expiration Configuration
+ *
+ * <p>This class defines the way state can be auto expired by the runtime. State expiration (also
+ * known as state TTL) can be used to keep state from growing arbitrarily by assigning an expiration
+ * date to a value.
+ *
+ * <p>State can be expired after a duration had passed since either from the last write to the
+ * state, or the last read.
+ */
+public final class Expiration implements Serializable {
+
+  private static final long serialVersionUID = 1L;
+
+  public enum Mode {
+    NONE,
+    AFTER_WRITE,
+    AFTER_READ_OR_WRITE;
+  }
+
+  /**
+   * Returns an Expiration configuration that would expire a @duration after the last write.
+   *
+   * @param duration a duration to wait before considering the state expired.
+   */
+  public static Expiration expireAfterWriting(Duration duration) {
+    return new Expiration(Mode.AFTER_WRITE, duration);
+  }
+
+  /**
+   * Returns an Expiration configuration that would expire a @duration after the last write or read.
+   *
+   * @param duration a duration to wait before considering the state expired.
+   */
+  public static Expiration expireAfterReadingOrWriting(Duration duration) {
+    return new Expiration(Mode.AFTER_READ_OR_WRITE, duration);
+  }
+
+  public static Expiration expireAfter(Duration duration, Mode mode) {
+    return new Expiration(mode, duration);
+  }
+
+  /** @return Returns a disabled expiration */
+  public static Expiration none() {
+    return new Expiration(Mode.NONE, Duration.ZERO);
+  }
+
+  private final Mode mode;
+  private final Duration duration;
+
+  private Expiration(Mode mode, Duration duration) {
+    this.mode = Objects.requireNonNull(mode);
+    this.duration = Objects.requireNonNull(duration);
+  }
+
+  public Mode mode() {
+    return mode;
+  }
+
+  public Duration duration() {
+    return duration;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("Expiration{mode=%s, duration=%s}", mode, duration);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
new file mode 100644
index 0000000..bd9aef6
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunction.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.message.Message;
+
+public interface StatefulFunction {
+
+  CompletableFuture<Void> apply(Context context, Message argument) throws Throwable;
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
new file mode 100644
index 0000000..b7dc7bc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctionSpec.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Supplier;
+
+public final class StatefulFunctionSpec {
+  private final TypeName typeName;
+  private final Map<String, ValueSpec<?>> knownValues;
+  private final Supplier<? extends StatefulFunction> supplier;
+
+  public static Builder builder(TypeName typeName) {
+    return new Builder(typeName);
+  }
+
+  private StatefulFunctionSpec(
+      TypeName typeName,
+      Map<String, ValueSpec<?>> knownValues,
+      Supplier<? extends StatefulFunction> supplier) {
+    this.typeName = Objects.requireNonNull(typeName);
+    this.supplier = Objects.requireNonNull(supplier);
+    this.knownValues = Objects.requireNonNull(knownValues);
+  }
+
+  public TypeName typeName() {
+    return typeName;
+  }
+
+  public Map<String, ValueSpec<?>> knownValues() {
+    return knownValues;
+  }
+
+  public Supplier<? extends StatefulFunction> supplier() {
+    return supplier;
+  }
+
+  public static final class Builder {
+    private final TypeName typeName;
+    private final Map<String, ValueSpec<?>> knownValues = new HashMap<>();
+    private Supplier<? extends StatefulFunction> supplier;
+
+    private Builder(TypeName typeName) {
+      this.typeName = Objects.requireNonNull(typeName);
+    }
+
+    public Builder withValueSpec(ValueSpec<?> valueSpec) {
+      knownValues.put(valueSpec.name(), valueSpec);
+      return this;
+    }
+
+    public Builder withSupplier(Supplier<? extends StatefulFunction> supplier) {
+      this.supplier = Objects.requireNonNull(supplier);
+      return this;
+    }
+
+    public StatefulFunctionSpec build() {
+      return new StatefulFunctionSpec(typeName, knownValues, supplier);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
new file mode 100644
index 0000000..9561827
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/StatefulFunctions.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.flink.statefun.sdk.java.handler.ConcurrentRequestReplyHandler;
+import org.apache.flink.statefun.sdk.java.handler.RequestReplyHandler;
+
+public class StatefulFunctions {
+  private final Map<TypeName, StatefulFunctionSpec> specs = new HashMap<>();
+
+  public StatefulFunctions withStatefulFunction(StatefulFunctionSpec.Builder builder) {
+    StatefulFunctionSpec spec = builder.build();
+    specs.put(spec.typeName(), spec);
+    return this;
+  }
+
+  public StatefulFunctions withStatefulFunction(StatefulFunctionSpec spec) {
+    specs.put(spec.typeName(), spec);
+    return this;
+  }
+
+  public Map<TypeName, StatefulFunctionSpec> functionSpecs() {
+    return specs;
+  }
+
+  public RequestReplyHandler requestReplyHandler() {
+    return new ConcurrentRequestReplyHandler(specs);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
new file mode 100644
index 0000000..ff7fde5
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/TypeName.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * This class represents the type of a {@code StatefulFunction}, consisting of a namespace of the
+ * function type as well as the type's name.
+ *
+ * <p>A function's type is part of a function's {@link Address} and serves as integral part of an
+ * individual function's identity.
+ *
+ * @see Address
+ */
+public final class TypeName implements Serializable {
+
+  private static final long serialVersionUID = 1;
+
+  private final String namespace;
+  private final String type;
+  private final String typenameString;
+  private final ByteString typenameByteString;
+
+  public static TypeName typeNameOf(String namespace, String name) {
+    Objects.requireNonNull(namespace);
+    Objects.requireNonNull(name);
+    if (namespace.endsWith("/")) {
+      namespace = namespace.substring(0, namespace.length() - 1);
+    }
+    if (namespace.isEmpty()) {
+      throw new IllegalArgumentException("namespace can not be empty.");
+    }
+    if (name.isEmpty()) {
+      throw new IllegalArgumentException("name can not be empty.");
+    }
+    return new TypeName(namespace, name);
+  }
+
+  public static TypeName typeNameFromString(String typeNameString) {
+    Objects.requireNonNull(typeNameString);
+    final int pos = typeNameString.lastIndexOf("/");
+    if (pos <= 0 || pos == typeNameString.length() - 1) {
+      throw new IllegalArgumentException(
+          typeNameString + " does not conform to the <namespace>/<name> format");
+    }
+    String namespace = typeNameString.substring(0, pos);
+    String name = typeNameString.substring(pos + 1);
+    return typeNameOf(namespace, name);
+  }
+
+  /**
+   * Creates a {@link TypeName}.
+   *
+   * @param namespace the function type's namepsace.
+   * @param type the function type's name.
+   */
+  private TypeName(String namespace, String type) {
+    this.namespace = Objects.requireNonNull(namespace);
+    this.type = Objects.requireNonNull(type);
+    String typenameString = canonicalTypeNameString(namespace, type);
+    this.typenameString = typenameString;
+    this.typenameByteString = ByteString.copyFromUtf8(typenameString);
+  }
+
+  /**
+   * Returns the namespace of the function type.
+   *
+   * @return the namespace of the function type.
+   */
+  public String namespace() {
+    return namespace;
+  }
+
+  /**
+   * Returns the name of the function type.
+   *
+   * @return the name of the function type.
+   */
+  public String name() {
+    return type;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TypeName functionType = (TypeName) o;
+    return namespace.equals(functionType.namespace) && type.equals(functionType.type);
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 0;
+    hash = 37 * hash + namespace.hashCode();
+    hash = 37 * hash + type.hashCode();
+    return hash;
+  }
+
+  @Override
+  public String toString() {
+    return "TypeName(" + namespace + ", " + type + ")";
+  }
+
+  public String asTypeNameString() {
+    return typenameString;
+  }
+
+  ByteString typeNameByteString() {
+    return typenameByteString;
+  }
+
+  private static String canonicalTypeNameString(String namespace, String type) {
+    return namespace + '/' + type;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
new file mode 100644
index 0000000..d62d2b7
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/ValueSpec.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java;
+
+import com.google.protobuf.ByteString;
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+
+public final class ValueSpec<T> {
+
+  public static Untyped named(String name) {
+    Objects.requireNonNull(name);
+    return new Untyped(name);
+  }
+
+  private final String name;
+  private final Expiration expiration;
+  private final Type<T> type;
+  private final ByteString nameByteString;
+
+  private ValueSpec(Untyped untyped, Type<T> type) {
+    Objects.requireNonNull(untyped);
+    Objects.requireNonNull(type);
+    this.name = untyped.stateName;
+    this.expiration = untyped.expiration;
+    this.type = Objects.requireNonNull(type);
+    this.nameByteString = ByteString.copyFromUtf8(untyped.stateName);
+  }
+
+  public String name() {
+    return name;
+  }
+
+  public Expiration expiration() {
+    return expiration;
+  }
+
+  public TypeName typeName() {
+    return type.typeName();
+  }
+
+  public Type<T> type() {
+    return type;
+  }
+
+  ByteString nameByteString() {
+    return nameByteString;
+  }
+
+  public static final class Untyped {
+    private final String stateName;
+    private Expiration expiration = Expiration.none();
+
+    public Untyped(String name) {
+      this.stateName = Objects.requireNonNull(name);
+    }
+
+    public Untyped thatExpireAfterWrite(Duration duration) {
+      this.expiration = Expiration.expireAfterWriting(duration);
+      return this;
+    }
+
+    public Untyped thatExpiresAfterReadOrWrite(Duration duration) {
+      this.expiration = Expiration.expireAfterReadingOrWriting(duration);
+      return this;
+    }
+
+    public ValueSpec<Integer> withIntType() {
+      return withCustomType(Types.integerType());
+    }
+
+    public ValueSpec<Long> withLongType() {
+      return withCustomType(Types.longType());
+    }
+
+    public ValueSpec<Float> withFloatType() {
+      return withCustomType(Types.floatType());
+    }
+
+    public ValueSpec<Double> withDoubleType() {
+      return withCustomType(Types.doubleType());
+    }
+
+    public ValueSpec<String> withUtf8String() {
+      return withCustomType(Types.stringType());
+    }
+
+    public ValueSpec<Boolean> withBooleanType() {
+      return new ValueSpec<>(this, Types.booleanType());
+    }
+
+    public <T> ValueSpec<T> withCustomType(Type<T> type) {
+      Objects.requireNonNull(type);
+      return new ValueSpec<>(this, type);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
new file mode 100644
index 0000000..a544b71
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/annotations/Internal.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.annotations;
+
+import java.lang.annotation.Documented;
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Target;
+
+/**
+ * Methods, constructors or classes annotated with this annotation, are used for by the SDK
+ * internally.
+ */
+@Documented
+@Target({ElementType.CONSTRUCTOR, ElementType.METHOD, ElementType.TYPE})
+public @interface Internal {}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
new file mode 100644
index 0000000..49e9fcc
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentContext.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.getTypedValue;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.protoAddressFromSdk;
+
+import java.time.Duration;
+import java.util.Objects;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+
+/**
+ * A thread safe implementation of a {@linkplain Context}.
+ *
+ * <p>This context's life cycle is tied to a single batch request. It is constructed when a
+ * {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction} message arrives, and it
+ * carries enough context to compute an {@linkplain
+ * org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.InvocationResponse}. Access to the
+ * send/sendAfter/sendEgress methods are synchronized with a @responseBuilder's lock, to prevent
+ * concurrent modification. When the last invocation of the batch completes successfully, a {@link
+ * #finalBuilder()} will be called. After that point no further operations are allowed.
+ */
+final class ConcurrentContext implements Context {
+  private final org.apache.flink.statefun.sdk.java.Address self;
+  private final FromFunction.InvocationResponse.Builder responseBuilder;
+  private final ConcurrentAddressScopedStorage storage;
+  private boolean noFurtherModificationsAllowed;
+
+  private Address caller;
+
+  public ConcurrentContext(
+      org.apache.flink.statefun.sdk.java.Address self,
+      FromFunction.InvocationResponse.Builder responseBuilder,
+      ConcurrentAddressScopedStorage storage) {
+    this.self = Objects.requireNonNull(self);
+    this.responseBuilder = Objects.requireNonNull(responseBuilder);
+    this.storage = Objects.requireNonNull(storage);
+  }
+
+  @Override
+  public org.apache.flink.statefun.sdk.java.Address self() {
+    return self;
+  }
+
+  void setCaller(Address address) {
+    this.caller = address;
+  }
+
+  FromFunction.InvocationResponse.Builder finalBuilder() {
+    synchronized (responseBuilder) {
+      noFurtherModificationsAllowed = true;
+      return responseBuilder;
+    }
+  }
+
+  @Override
+  public Optional<Address> caller() {
+    return Optional.ofNullable(caller);
+  }
+
+  @Override
+  public void send(Message message) {
+    Objects.requireNonNull(message);
+
+    FromFunction.Invocation outInvocation =
+        FromFunction.Invocation.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setTarget(protoAddressFromSdk(message.targetAddress()))
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addOutgoingMessages(outInvocation);
+    }
+  }
+
+  @Override
+  public void sendAfter(Duration duration, Message message) {
+    Objects.requireNonNull(duration);
+    Objects.requireNonNull(message);
+
+    FromFunction.DelayedInvocation outInvocation =
+        FromFunction.DelayedInvocation.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setTarget(protoAddressFromSdk(message.targetAddress()))
+            .setDelayInMs(duration.toMillis())
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addDelayedInvocations(outInvocation);
+    }
+  }
+
+  @Override
+  public void send(EgressMessage message) {
+    Objects.requireNonNull(message);
+
+    TypeName target = message.targetEgressId();
+
+    FromFunction.EgressMessage outInvocation =
+        FromFunction.EgressMessage.newBuilder()
+            .setArgument(getTypedValue(message))
+            .setEgressNamespace(target.namespace())
+            .setEgressType(target.name())
+            .build();
+
+    synchronized (responseBuilder) {
+      checkNotDone();
+      responseBuilder.addOutgoingEgresses(outInvocation);
+    }
+  }
+
+  @Override
+  public AddressScopedStorage storage() {
+    return storage;
+  }
+
+  private void checkNotDone() {
+    if (noFurtherModificationsAllowed) {
+      throw new IllegalStateException("Function has already completed its execution.");
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
new file mode 100644
index 0000000..7397525
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandler.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.apache.flink.statefun.sdk.java.handler.ProtoUtils.sdkAddressFromProto;
+
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.storage.ConcurrentAddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.storage.StateValueContexts;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+/**
+ * A threadsafe {@linkplain RequestReplyHandler}. This handler lifecycle is bound to the entire
+ * program, and can be safely and concurrently used to handle {@linkplain ToFunction} requests.
+ */
+@Internal
+public final class ConcurrentRequestReplyHandler implements RequestReplyHandler {
+  private final Map<TypeName, StatefulFunctionSpec> functionSpecs;
+
+  public ConcurrentRequestReplyHandler(Map<TypeName, StatefulFunctionSpec> functionSpecs) {
+    this.functionSpecs = Objects.requireNonNull(functionSpecs);
+  }
+
+  @Override
+  public CompletableFuture<Slice> handle(Slice requestBytes) {
+    try {
+      ByteString in = SliceProtobufUtil.asByteString(requestBytes);
+      ToFunction request = ToFunction.parseFrom(in);
+      CompletableFuture<FromFunction> response = handleInternally(request);
+      return response.thenApply(
+          res -> {
+            ByteString out = res.toByteString();
+            return SliceProtobufUtil.asSlice(out);
+          });
+    } catch (Throwable throwable) {
+      return MoreFutures.exceptional(throwable);
+    }
+  }
+
+  CompletableFuture<FromFunction> handleInternally(ToFunction request) {
+    if (!request.hasInvocation()) {
+      return CompletableFuture.completedFuture(FromFunction.getDefaultInstance());
+    }
+    ToFunction.InvocationBatchRequest batchRequest = request.getInvocation();
+    Address self = sdkAddressFromProto(batchRequest.getTarget());
+    StatefulFunctionSpec targetSpec = functionSpecs.get(self.type());
+    if (targetSpec == null) {
+      throw new IllegalStateException("Unknown target type " + self);
+    }
+    Supplier<? extends StatefulFunction> supplier = targetSpec.supplier();
+    if (supplier == null) {
+      throw new NullPointerException("missing function supplier for " + self);
+    }
+    StatefulFunction function = supplier.get();
+    if (function == null) {
+      throw new NullPointerException("supplier for " + self + " supplied NULL function.");
+    }
+    StateValueContexts.ResolutionResult stateResolution =
+        StateValueContexts.resolve(targetSpec.knownValues(), batchRequest.getStateList());
+    if (stateResolution.hasMissingValues()) {
+      // not enough information to compute this batch.
+      FromFunction res = buildIncompleteInvocationResponse(stateResolution.missingValues());
+      return CompletableFuture.completedFuture(res);
+    }
+    final ConcurrentAddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(stateResolution.resolved());
+    return executeBatch(batchRequest, self, storage, function);
+  }
+
+  private CompletableFuture<FromFunction> executeBatch(
+      ToFunction.InvocationBatchRequest inputBatch,
+      Address self,
+      ConcurrentAddressScopedStorage storage,
+      StatefulFunction function) {
+
+    FromFunction.InvocationResponse.Builder responseBuilder =
+        FromFunction.InvocationResponse.newBuilder();
+
+    ConcurrentContext context = new ConcurrentContext(self, responseBuilder, storage);
+
+    CompletableFuture<Void> allDone =
+        applySequentially(
+            inputBatch.getInvocationsList(), invocation -> apply(function, context, invocation));
+
+    return allDone.thenApply(unused -> finalizeResponse(storage, context.finalBuilder()));
+  }
+
+  private static FromFunction buildIncompleteInvocationResponse(List<ValueSpec<?>> missing) {
+    FromFunction.IncompleteInvocationContext.Builder result =
+        FromFunction.IncompleteInvocationContext.newBuilder();
+
+    for (ValueSpec<?> v : missing) {
+      result.addMissingValues(ProtoUtils.protoFromValueSpec(v));
+    }
+
+    return FromFunction.newBuilder().setIncompleteInvocationContext(result).build();
+  }
+
+  private static CompletableFuture<Void> apply(
+      StatefulFunction function, ConcurrentContext context, ToFunction.Invocation invocation)
+      throws Throwable {
+    TypedValue argument = invocation.getArgument();
+    MessageWrapper wrapper = new MessageWrapper(context.self(), argument);
+    context.setCaller(sdkAddressFromProto(invocation.getCaller()));
+    CompletableFuture<Void> future = function.apply(context, wrapper);
+    if (future == null) {
+      throw new IllegalStateException(
+          "User function " + context.self() + " has returned a NULL future.");
+    }
+    return future;
+  }
+
+  private static FromFunction finalizeResponse(
+      ConcurrentAddressScopedStorage storage, FromFunction.InvocationResponse.Builder builder) {
+    storage.addMutations(builder::addStateMutations);
+    return FromFunction.newBuilder().setInvocationResult(builder).build();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
new file mode 100644
index 0000000..bd3a9e1
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/MoreFutures.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.Iterator;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+
+final class MoreFutures {
+
+  @FunctionalInterface
+  public interface Fn<I, O> {
+    O apply(I input) throws Throwable;
+  }
+
+  /**
+   * Apply @fn for each element of @elements sequentially. Subsequent element is handed to fn only
+   * after the previous future has completed.
+   */
+  public static <T> CompletableFuture<Void> applySequentially(
+      Iterable<T> elements, Fn<T, CompletableFuture<Void>> fn) {
+    Objects.requireNonNull(elements);
+    Objects.requireNonNull(fn);
+    return applySequentially(elements.iterator(), fn);
+  }
+
+  private static <T> CompletableFuture<Void> applySequentially(
+      Iterator<T> iterator, Fn<T, CompletableFuture<Void>> fn) {
+    try {
+      while (iterator.hasNext()) {
+        T next = iterator.next();
+        CompletableFuture<Void> future = fn.apply(next);
+        if (!future.isDone()) {
+          return future.thenCompose(ignored -> applySequentially(iterator, fn));
+        }
+        if (future.isCompletedExceptionally()) {
+          return future;
+        }
+      }
+      return CompletableFuture.completedFuture(null);
+    } catch (Throwable t) {
+      return exceptional(t);
+    }
+  }
+
+  static <T> CompletableFuture<T> exceptional(Throwable cause) {
+    CompletableFuture<T> e = new CompletableFuture<>();
+    e.completeExceptionally(cause);
+    return e;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
new file mode 100644
index 0000000..cbeb5c3
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/ProtoUtils.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.ExpireMode;
+import static org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.ExpirationSpec.newBuilder;
+
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.Expiration;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.message.MessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueSpec;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+final class ProtoUtils {
+  private ProtoUtils() {}
+
+  static Address protoAddressFromSdk(org.apache.flink.statefun.sdk.java.Address address) {
+    return Address.newBuilder()
+        .setNamespace(address.type().namespace())
+        .setType(address.type().name())
+        .setId(address.id())
+        .build();
+  }
+
+  static org.apache.flink.statefun.sdk.java.Address sdkAddressFromProto(Address address) {
+    if (address == null
+        || (address.getNamespace().isEmpty()
+            && address.getType().isEmpty()
+            && address.getId().isEmpty())) {
+      return null;
+    }
+    return new org.apache.flink.statefun.sdk.java.Address(
+        TypeName.typeNameOf(address.getNamespace(), address.getType()), address.getId());
+  }
+
+  static PersistedValueSpec.Builder protoFromValueSpec(ValueSpec<?> valueSpec) {
+    PersistedValueSpec.Builder specBuilder =
+        PersistedValueSpec.newBuilder()
+            .setStateNameBytes(ApiExtension.stateNameByteString(valueSpec))
+            .setTypeTypenameBytes(ApiExtension.typeNameByteString(valueSpec.typeName()));
+
+    if (valueSpec.expiration().mode() == Expiration.Mode.NONE) {
+      return specBuilder;
+    }
+
+    ExpireMode mode =
+        valueSpec.expiration().mode() == Expiration.Mode.AFTER_READ_OR_WRITE
+            ? ExpireMode.AFTER_INVOKE
+            : ExpireMode.AFTER_WRITE;
+    long value = valueSpec.expiration().duration().toMillis();
+
+    specBuilder.setExpirationSpec(newBuilder().setExpireAfterMillis(value).setMode(mode));
+    return specBuilder;
+  }
+
+  static TypedValue getTypedValue(Message message) {
+    if (message instanceof MessageWrapper) {
+      return ((MessageWrapper) message).typedValue();
+    }
+    return TypedValue.newBuilder()
+        .setTypenameBytes(ApiExtension.typeNameByteString(message.valueTypeName()))
+        .setValue(SliceProtobufUtil.asByteString(message.rawValue()))
+        .build();
+  }
+
+  static TypedValue getTypedValue(EgressMessage message) {
+    if (message instanceof EgressMessageWrapper) {
+      return ((EgressMessageWrapper) message).typedValue();
+    }
+    return TypedValue.newBuilder()
+        .setTypenameBytes(ApiExtension.typeNameByteString(message.egressMessageValueType()))
+        .setValue(SliceProtobufUtil.asByteString(message.egressMessageValueBytes()))
+        .build();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
new file mode 100644
index 0000000..ed7783c
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/handler/RequestReplyHandler.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.handler;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface RequestReplyHandler {
+
+  /**
+   * Handles a {@code Stateful Functions} invocation.
+   *
+   * @param input a {@linkplain Slice} as received from the {@code StateFun} server.
+   * @return a serialized representation of the side-effects to preform by the {@code StateFun}
+   *     server, as the result of this invocation.
+   */
+  CompletableFuture<Slice> handle(Slice input);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
new file mode 100644
index 0000000..c55b582
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KafkaEgressMessage.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KafkaProducerRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KafkaEgressMessage {
+
+  public static Builder forEgress(TypeName targetEgressId) {
+    Objects.requireNonNull(targetEgressId);
+    return new Builder(targetEgressId);
+  }
+
+  public static final class Builder {
+    private static final TypeName KAFKA_PRODUCER_RECORD_TYPENAME =
+        TypeName.typeNameOf(
+            "type.googleapis.com", KafkaProducerRecord.getDescriptor().getFullName());
+
+    private final TypeName targetEgressId;
+    private ByteString targetTopic;
+    private ByteString keyBytes;
+    private ByteString value;
+
+    private Builder(TypeName targetEgressId) {
+      this.targetEgressId = targetEgressId;
+    }
+
+    public Builder withTopic(String topic) {
+      this.targetTopic = ByteString.copyFromUtf8(topic);
+      return this;
+    }
+
+    public Builder withUtf8Key(String key) {
+      Objects.requireNonNull(key);
+      this.keyBytes = ByteString.copyFromUtf8(key);
+      return this;
+    }
+
+    public Builder withKey(byte[] key) {
+      Objects.requireNonNull(key);
+      this.keyBytes = ByteString.copyFrom(key);
+      return this;
+    }
+
+    public Builder withKey(Slice slice) {
+      Objects.requireNonNull(slice);
+      this.keyBytes = SliceProtobufUtil.asByteString(slice);
+      return this;
+    }
+
+    public <T> Builder withKey(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withKey(serializer.serialize(value));
+    }
+
+    public Builder withUtf8Value(String value) {
+      Objects.requireNonNull(value);
+      this.value = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withValue(Slice slice) {
+      Objects.requireNonNull(value);
+      this.value = SliceProtobufUtil.asByteString(slice);
+      return this;
+    }
+
+    public <T> Builder withValue(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withValue(serializer.serialize(value));
+    }
+
+    public Builder withValue(byte[] value) {
+      Objects.requireNonNull(value);
+      this.value = ByteString.copyFrom(value);
+      return this;
+    }
+
+    public EgressMessage build() {
+      if (targetTopic == null) {
+        throw new IllegalStateException("A Kafka record requires a target topic.");
+      }
+      if (value == null) {
+        throw new IllegalStateException("A Kafka record requires value bytes");
+      }
+      KafkaProducerRecord.Builder builder =
+          KafkaProducerRecord.newBuilder().setTopicBytes(targetTopic).setValueBytes(value);
+      if (keyBytes != null) {
+        builder.setKeyBytes(keyBytes);
+      }
+      KafkaProducerRecord record = builder.build();
+      TypedValue typedValue =
+          TypedValue.newBuilder()
+              .setTypenameBytes(ApiExtension.typeNameByteString(KAFKA_PRODUCER_RECORD_TYPENAME))
+              .setValue(record.toByteString())
+              .build();
+
+      return new EgressMessageWrapper(targetEgressId, typedValue);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
new file mode 100644
index 0000000..000bb59
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/io/KinesisEgressMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.io;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.egress.generated.KinesisEgressRecord;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.message.EgressMessage;
+import org.apache.flink.statefun.sdk.java.message.EgressMessageWrapper;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class KinesisEgressMessage {
+
+  public static Builder forEgress(TypeName targetEgressId) {
+    Objects.requireNonNull(targetEgressId);
+    return new Builder(targetEgressId);
+  }
+
+  public static final class Builder {
+    private static final TypeName KINESIS_PRODUCER_RECORD_TYPENAME =
+        TypeName.typeNameOf(
+            "type.googleapis.com", KinesisEgressRecord.getDescriptor().getFullName());
+
+    private final TypeName targetEgressId;
+    private ByteString targetStreamBytes;
+    private ByteString partitionKeyBytes;
+    private ByteString valueBytes;
+    private ByteString explicitHashKey;
+
+    private Builder(TypeName targetEgressId) {
+      this.targetEgressId = targetEgressId;
+    }
+
+    public Builder withStream(String stream) {
+      Objects.requireNonNull(stream);
+      this.targetStreamBytes = ByteString.copyFromUtf8(stream);
+      return this;
+    }
+
+    public Builder withStream(Slice stream) {
+      this.targetStreamBytes = SliceProtobufUtil.asByteString(stream);
+      return this;
+    }
+
+    public Builder withUtf8PartitionKey(String key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = ByteString.copyFromUtf8(key);
+      return this;
+    }
+
+    public Builder withPartitionKey(byte[] key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = ByteString.copyFrom(key);
+      return this;
+    }
+
+    public Builder withPartitionKey(Slice key) {
+      Objects.requireNonNull(key);
+      this.partitionKeyBytes = SliceProtobufUtil.asByteString(key);
+      return this;
+    }
+
+    public Builder withUtf8Value(String value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withValue(byte[] value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = ByteString.copyFrom(value);
+      return this;
+    }
+
+    public Builder withValue(Slice value) {
+      Objects.requireNonNull(value);
+      this.valueBytes = SliceProtobufUtil.asByteString(value);
+      return this;
+    }
+
+    public <T> Builder withValue(Type<T> type, T value) {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      return withValue(serializer.serialize(value));
+    }
+
+    public Builder withUtf8ExplicitHashKey(String value) {
+      Objects.requireNonNull(value);
+      this.explicitHashKey = ByteString.copyFromUtf8(value);
+      return this;
+    }
+
+    public Builder withUtf8ExplicitHashKey(Slice utf8Slice) {
+      Objects.requireNonNull(utf8Slice);
+      this.explicitHashKey = SliceProtobufUtil.asByteString(utf8Slice);
+      return this;
+    }
+
+    public EgressMessage build() {
+      KinesisEgressRecord.Builder builder = KinesisEgressRecord.newBuilder();
+      if (targetStreamBytes == null) {
+        throw new IllegalStateException("Missing destination Kinesis stream");
+      }
+      builder.setStreamBytes(targetStreamBytes);
+      if (partitionKeyBytes == null) {
+        throw new IllegalStateException("Missing partition key");
+      }
+      builder.setPartitionKeyBytes(partitionKeyBytes);
+      if (valueBytes == null) {
+        throw new IllegalStateException("Missing value");
+      }
+      builder.setValueBytes(valueBytes);
+      if (explicitHashKey != null) {
+        builder.setExplicitHashKeyBytes(explicitHashKey);
+      }
+
+      TypedValue typedValue =
+          TypedValue.newBuilder()
+              .setTypenameBytes(ApiExtension.typeNameByteString(KINESIS_PRODUCER_RECORD_TYPENAME))
+              .setValue(builder.build().toByteString())
+              .build();
+
+      return new EgressMessageWrapper(targetEgressId, typedValue);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
new file mode 100644
index 0000000..0b8be7b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessage.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface EgressMessage {
+  TypeName targetEgressId();
+
+  TypeName egressMessageValueType();
+
+  Slice egressMessageValueBytes();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
new file mode 100644
index 0000000..9c849a2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/EgressMessageWrapper.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class EgressMessageWrapper implements EgressMessage {
+  private final TypedValue typedValue;
+  private final TypeName targetEgressId;
+
+  public EgressMessageWrapper(TypeName targetEgressId, TypedValue actualMessage) {
+    this.targetEgressId = Objects.requireNonNull(targetEgressId);
+    this.typedValue = Objects.requireNonNull(actualMessage);
+  }
+
+  @Override
+  public TypeName targetEgressId() {
+    return targetEgressId;
+  }
+
+  @Override
+  public TypeName egressMessageValueType() {
+    return TypeName.typeNameFromString(typedValue.getTypename());
+  }
+
+  @Override
+  public Slice egressMessageValueBytes() {
+    return SliceProtobufUtil.asSlice(typedValue.getValue());
+  }
+
+  public TypedValue typedValue() {
+    return typedValue;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
new file mode 100644
index 0000000..e2e3467
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/Message.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.message;
+
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.types.Type;
+
+public interface Message {
+  Address targetAddress();
+
+  boolean isLong();
+
+  long asLong();
+
+  boolean isUtf8String();
+
+  String asUtf8String();
+
+  boolean isInt();
+
+  int asInt();
+
+  boolean isBoolean();
+
+  boolean asBoolean();
+
+  boolean isFloat();
+
+  float asFloat();
+
+  boolean isDouble();
+
+  double asDouble();
+
+  <T> boolean is(Type<T> type);
+
+  <T> T as(Type<T> type);
+
+  TypeName valueTypeName();
+
+  Slice rawValue();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
new file mode 100644
index 0000000..cd9f738
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageBuilder.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import com.google.protobuf.ByteString;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.ApiExtension;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+public final class MessageBuilder {
+  private final TypedValue.Builder builder;
+  private Address targetAddress;
+
+  private MessageBuilder(TypeName functionType, String id) {
+    this(functionType, id, TypedValue.newBuilder());
+  }
+
+  private MessageBuilder(TypeName functionType, String id, TypedValue.Builder builder) {
+    this.targetAddress = new Address(functionType, id);
+    this.builder = Objects.requireNonNull(builder);
+  }
+
+  public static MessageBuilder forAddress(TypeName functionType, String id) {
+    return new MessageBuilder(functionType, id);
+  }
+
+  public static MessageBuilder forAddress(Address address) {
+    Objects.requireNonNull(address);
+    return new MessageBuilder(address.type(), address.id());
+  }
+
+  public static MessageBuilder fromMessage(Message message) {
+    Address targetAddress = message.targetAddress();
+    TypedValue.Builder builder = typedValueBuilder(message);
+    return new MessageBuilder(targetAddress.type(), targetAddress.id(), builder);
+  }
+
+  public MessageBuilder withValue(long value) {
+    return withCustomType(Types.longType(), value);
+  }
+
+  public MessageBuilder withValue(int value) {
+    return withCustomType(Types.integerType(), value);
+  }
+
+  public MessageBuilder withValue(boolean value) {
+    return withCustomType(Types.booleanType(), value);
+  }
+
+  public MessageBuilder withValue(String value) {
+    return withCustomType(Types.stringType(), value);
+  }
+
+  public MessageBuilder withValue(float value) {
+    return withCustomType(Types.floatType(), value);
+  }
+
+  public MessageBuilder withValue(double value) {
+    return withCustomType(Types.doubleType(), value);
+  }
+
+  public MessageBuilder withTargetAddress(Address targetAddress) {
+    this.targetAddress = Objects.requireNonNull(targetAddress);
+    return this;
+  }
+
+  public MessageBuilder withTargetAddress(TypeName typeName, String id) {
+    return withTargetAddress(new Address(typeName, id));
+  }
+
+  public <T> MessageBuilder withCustomType(Type<T> customType, T element) {
+    Objects.requireNonNull(customType);
+    Objects.requireNonNull(element);
+    TypeSerializer<T> typeSerializer = customType.typeSerializer();
+    builder.setTypenameBytes(ApiExtension.typeNameByteString(customType.typeName()));
+    Slice serialized = typeSerializer.serialize(element);
+    ByteString serializedByteString = SliceProtobufUtil.asByteString(serialized);
+    builder.setValue(serializedByteString);
+    return this;
+  }
+
+  public Message build() {
+    return new MessageWrapper(targetAddress, builder.build());
+  }
+
+  private static TypedValue.Builder typedValueBuilder(Message message) {
+    ByteString typenameBytes = ApiExtension.typeNameByteString(message.valueTypeName());
+    ByteString valueBytes = SliceProtobufUtil.asByteString(message.rawValue());
+    return TypedValue.newBuilder().setTypenameBytes(typenameBytes).setValue(valueBytes);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
new file mode 100644
index 0000000..74c5955
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/message/MessageWrapper.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.message;
+
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.Address;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class MessageWrapper implements Message {
+  private final TypedValue typedValue;
+  private final Address targetAddress;
+
+  public MessageWrapper(Address targetAddress, TypedValue typedValue) {
+    this.targetAddress = Objects.requireNonNull(targetAddress);
+    this.typedValue = Objects.requireNonNull(typedValue);
+  }
+
+  @Override
+  public Address targetAddress() {
+    return targetAddress;
+  }
+
+  @Override
+  public boolean isLong() {
+    return is(Types.longType());
+  }
+
+  @Override
+  public long asLong() {
+    return as(Types.longType());
+  }
+
+  @Override
+  public boolean isUtf8String() {
+    return is(Types.stringType());
+  }
+
+  @Override
+  public String asUtf8String() {
+    return as(Types.stringType());
+  }
+
+  @Override
+  public boolean isInt() {
+    return is(Types.integerType());
+  }
+
+  @Override
+  public int asInt() {
+    return as(Types.integerType());
+  }
+
+  @Override
+  public boolean isBoolean() {
+    return is(Types.booleanType());
+  }
+
+  @Override
+  public boolean asBoolean() {
+    return as(Types.booleanType());
+  }
+
+  @Override
+  public boolean isFloat() {
+    return is(Types.floatType());
+  }
+
+  @Override
+  public float asFloat() {
+    return as(Types.floatType());
+  }
+
+  @Override
+  public boolean isDouble() {
+    return is(Types.doubleType());
+  }
+
+  @Override
+  public double asDouble() {
+    return as(Types.doubleType());
+  }
+
+  @Override
+  public <T> boolean is(Type<T> type) {
+    String thisTypeNameString = typedValue.getTypename();
+    String thatTypeNameString = type.typeName().asTypeNameString();
+    return thisTypeNameString.equals(thatTypeNameString);
+  }
+
+  @Override
+  public <T> T as(Type<T> type) {
+    TypeSerializer<T> typeSerializer = type.typeSerializer();
+    Slice input = SliceProtobufUtil.asSlice(typedValue.getValue());
+    return typeSerializer.deserialize(input);
+  }
+
+  @Override
+  public TypeName valueTypeName() {
+    return TypeName.typeNameFromString(typedValue.getTypename());
+  }
+
+  @Override
+  public Slice rawValue() {
+    return SliceProtobufUtil.asSlice(typedValue.getValue());
+  }
+
+  public TypedValue typedValue() {
+    return typedValue;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
new file mode 100644
index 0000000..c28aa58
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/ByteStringSlice.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+final class ByteStringSlice implements Slice {
+  private final ByteString byteString;
+
+  public ByteStringSlice(ByteString bytes) {
+    this.byteString = Objects.requireNonNull(bytes);
+  }
+
+  public ByteString byteString() {
+    return byteString;
+  }
+
+  @Override
+  public ByteBuffer asReadOnlyByteBuffer() {
+    return byteString.asReadOnlyByteBuffer();
+  }
+
+  @Override
+  public int readableBytes() {
+    return byteString.size();
+  }
+
+  @Override
+  public void copyTo(byte[] target) {
+    copyTo(target, 0);
+  }
+
+  @Override
+  public void copyTo(byte[] target, int targetOffset) {
+    byteString.copyTo(target, targetOffset);
+  }
+
+  @Override
+  public void copyTo(OutputStream outputStream) {
+    try {
+      byteString.writeTo(outputStream);
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  @Override
+  public byte byteAt(int position) {
+    return byteString.byteAt(position);
+  }
+
+  @Override
+  public void copyTo(ByteBuffer buffer) {
+    byteString.copyTo(buffer);
+  }
+
+  @Override
+  public byte[] toByteArray() {
+    return byteString.toByteArray();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
new file mode 100644
index 0000000..8c6187e
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slice.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+public interface Slice {
+
+  int readableBytes();
+
+  void copyTo(ByteBuffer target);
+
+  void copyTo(byte[] target);
+
+  void copyTo(byte[] target, int targetOffset);
+
+  void copyTo(OutputStream outputStream);
+
+  byte byteAt(int position);
+
+  ByteBuffer asReadOnlyByteBuffer();
+
+  byte[] toByteArray();
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
new file mode 100644
index 0000000..2573e4f
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceOutput.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Objects;
+
+public final class SliceOutput {
+  private byte[] buf;
+  private int position;
+
+  public static SliceOutput sliceOutput(int initialSize) {
+    return new SliceOutput(initialSize);
+  }
+
+  private SliceOutput(int initialSize) {
+    if (initialSize < 0) {
+      throw new IllegalArgumentException("initial size has to be non negative");
+    }
+    this.buf = new byte[initialSize];
+    this.position = 0;
+  }
+
+  public void write(byte b) {
+    ensureCapacity(1);
+    buf[position] = b;
+    position++;
+  }
+
+  public void write(byte[] buffer) {
+    write(buffer, 0, buffer.length);
+  }
+
+  public void write(byte[] buffer, int offset, int len) {
+    Objects.requireNonNull(buffer);
+    if (offset < 0 || offset > buffer.length) {
+      throw new IllegalArgumentException("Offset out of range " + offset);
+    }
+    if (len < 0) {
+      throw new IllegalArgumentException("Negative length " + len);
+    }
+    ensureCapacity(len);
+    System.arraycopy(buffer, offset, buf, position, len);
+    position += len;
+  }
+
+  public void write(ByteBuffer buffer) {
+    int n = buffer.remaining();
+    ensureCapacity(n);
+    buffer.get(buf, position, n);
+    position += n;
+  }
+
+  public void write(Slice slice) {
+    write(slice.asReadOnlyByteBuffer());
+  }
+
+  public void writeFully(InputStream input) {
+    try {
+      int bytesRead;
+      do {
+        ensureCapacity(256);
+        bytesRead = input.read(buf, position, remaining());
+        position += bytesRead;
+      } while (bytesRead != -1);
+      position++; // compensate for the latest -1 addition.
+    } catch (IOException e) {
+      throw new IllegalStateException(e);
+    }
+  }
+
+  public Slice copyOf() {
+    return Slices.copyOf(buf, 0, position);
+  }
+
+  public Slice view() {
+    return Slices.wrap(buf, 0, position);
+  }
+
+  private int remaining() {
+    return buf.length - position;
+  }
+
+  private void ensureCapacity(final int bytesNeeded) {
+    final int requiredNewLength = position + bytesNeeded;
+    if (requiredNewLength >= buf.length) {
+      this.buf = Arrays.copyOf(buf, 2 * requiredNewLength);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
new file mode 100644
index 0000000..2a87b5b
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtil.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.Message;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.Parser;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+
+@Internal
+public final class SliceProtobufUtil {
+  private SliceProtobufUtil() {}
+
+  public static <T> T parseFrom(Parser<T> parser, Slice slice)
+      throws InvalidProtocolBufferException {
+    if (slice instanceof ByteStringSlice) {
+      ByteString byteString = ((ByteStringSlice) slice).byteString();
+      return parser.parseFrom(byteString);
+    }
+    return parser.parseFrom(slice.asReadOnlyByteBuffer());
+  }
+
+  public static Slice toSlice(Message message) {
+    return Slices.wrap(message.toByteArray());
+  }
+
+  public static ByteString asByteString(Slice slice) {
+    if (slice instanceof ByteStringSlice) {
+      ByteStringSlice byteStringSlice = (ByteStringSlice) slice;
+      return byteStringSlice.byteString();
+    }
+    return MoreByteStrings.wrap(slice.asReadOnlyByteBuffer());
+  }
+
+  public static Slice asSlice(ByteString byteString) {
+    return new ByteStringSlice(byteString);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
new file mode 100644
index 0000000..7f42291
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/slice/Slices.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.MoreByteStrings;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public final class Slices {
+  private Slices() {}
+
+  public static Slice wrap(ByteBuffer buffer) {
+    return wrap(MoreByteStrings.wrap(buffer));
+  }
+
+  public static Slice wrap(byte[] bytes) {
+    return wrap(MoreByteStrings.wrap(bytes));
+  }
+
+  private static Slice wrap(ByteString bytes) {
+    return new ByteStringSlice(bytes);
+  }
+
+  public static Slice wrap(byte[] bytes, int offset, int len) {
+    return wrap(MoreByteStrings.wrap(bytes, offset, len));
+  }
+
+  public static Slice copyOf(byte[] bytes) {
+    return wrap(ByteString.copyFrom(bytes));
+  }
+
+  public static Slice copyOf(byte[] bytes, int offset, int len) {
+    return wrap(ByteString.copyFrom(bytes, offset, len));
+  }
+
+  public static Slice copyOf(InputStream inputStream, int expectedStreamSize) {
+    SliceOutput out = SliceOutput.sliceOutput(expectedStreamSize);
+    out.writeFully(inputStream);
+    return out.view();
+  }
+
+  public static Slice copyFromUtf8(String input) {
+    return wrap(ByteString.copyFromUtf8(input));
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
new file mode 100644
index 0000000..d692c13
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorage.java
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.TypeCharacteristics;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction.PersistedValueMutation;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+
+@Internal
+public final class ConcurrentAddressScopedStorage implements AddressScopedStorage {
+
+  private final List<Cell<?>> cells;
+
+  public ConcurrentAddressScopedStorage(List<StateValueContext<?>> stateValues) {
+    this.cells = createCells(stateValues);
+  }
+
+  @Override
+  public <T> Optional<T> get(ValueSpec<T> valueSpec) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    return cell.get();
+  }
+
+  @Override
+  public <T> void set(ValueSpec<T> valueSpec, T value) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    cell.set(value);
+  }
+
+  @Override
+  public <T> void remove(ValueSpec<T> valueSpec) {
+    final Cell<T> cell = getCellOrThrow(valueSpec);
+    cell.remove();
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Cell<T> getCellOrThrow(ValueSpec<T> runtimeSpec) {
+    // fast path: the user used the same ValueSpec reference to declare the function
+    // and to index into the state.
+    for (Cell<?> cell : cells) {
+      ValueSpec<?> registeredSpec = cell.spec();
+      if (runtimeSpec == registeredSpec) {
+        return (Cell<T>) cell;
+      }
+    }
+    return slowGetCellOrThrow(runtimeSpec);
+  }
+
+  @SuppressWarnings("unchecked")
+  private <T> Cell<T> slowGetCellOrThrow(ValueSpec<T> valueSpec) {
+    // unlikely slow path: when the users used a different ValueSpec instance in registration
+    // and at runtime.
+    for (Cell<?> cell : cells) {
+      ValueSpec<?> thisSpec = cell.spec();
+      String thisName = thisSpec.name();
+      if (!thisName.equals(valueSpec.name())) {
+        continue;
+      }
+      if (thisSpec.typeName().equals(valueSpec.typeName())) {
+        return (Cell<T>) cell;
+      }
+      throw new IllegalStorageAccessException(
+          valueSpec.name(),
+          "Accessed state with incorrect type; state type was registered as "
+              + thisSpec.typeName()
+              + ", but was accessed as type "
+              + valueSpec.typeName());
+    }
+    throw new IllegalStorageAccessException(
+        valueSpec.name(), "State does not exist; make sure that this state was registered.");
+  }
+
+  public void addMutations(Consumer<PersistedValueMutation> consumer) {
+    for (Cell<?> cell : cells) {
+      cell.toProtocolValueMutation().ifPresent(consumer);
+    }
+  }
+
+  // ===============================================================================
+  //  Thread-safe state value cells
+  // ===============================================================================
+
+  private interface Cell<T> {
+    Optional<T> get();
+
+    void set(T value);
+
+    void remove();
+
+    Optional<FromFunction.PersistedValueMutation> toProtocolValueMutation();
+
+    ValueSpec<T> spec();
+  }
+
+  private static <T> Optional<T> tryDeserialize(
+      TypeSerializer<T> serializer, TypedValue typedValue) {
+    if (!typedValue.getHasValue()) {
+      return Optional.empty();
+    }
+    Slice slice = SliceProtobufUtil.asSlice(typedValue.getValue());
+    T value = serializer.deserialize(slice);
+    return Optional.ofNullable(value);
+  }
+
+  private static <T> ByteString serialize(TypeSerializer<T> serializer, T value) {
+    Slice slice = serializer.serialize(value);
+    return SliceProtobufUtil.asByteString(slice);
+  }
+
+  private static final class ImmutableTypeCell<T> implements Cell<T> {
+    private final ReentrantLock lock = new ReentrantLock();
+    private final ValueSpec<T> spec;
+    private final TypedValue typedValue;
+    private final TypeSerializer<T> serializer;
+
+    private CellStatus status = CellStatus.UNMODIFIED;
+    private T cachedObject;
+
+    public ImmutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+      this.spec = spec;
+      this.typedValue = typedValue;
+      this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+    }
+
+    @Override
+    public Optional<T> get() {
+      lock.lock();
+      try {
+        if (status == CellStatus.DELETED) {
+          return Optional.empty();
+        }
+        if (cachedObject != null) {
+          return Optional.of(cachedObject);
+        }
+        Optional<T> result = tryDeserialize(serializer, typedValue);
+        result.ifPresent(object -> this.cachedObject = object);
+        return result;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void set(T value) {
+      if (value == null) {
+        throw new IllegalStorageAccessException(
+            spec.name(), "Can not set state to NULL. Please use remove() instead.");
+      }
+      lock.lock();
+      try {
+        cachedObject = value;
+        status = CellStatus.MODIFIED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void remove() {
+      lock.lock();
+      try {
+        cachedObject = null;
+        status = CellStatus.DELETED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public Optional<PersistedValueMutation> toProtocolValueMutation() {
+      final String typeNameString = spec.typeName().asTypeNameString();
+      switch (status) {
+        case MODIFIED:
+          final TypedValue.Builder newValue =
+              TypedValue.newBuilder()
+                  .setTypename(typeNameString)
+                  .setHasValue(true)
+                  .setValue(serialize(serializer, cachedObject));
+
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+                  .setStateValue(newValue)
+                  .build());
+        case DELETED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.DELETE)
+                  .build());
+        case UNMODIFIED:
+          return Optional.empty();
+        default:
+          throw new IllegalStateException("Unknown cell status: " + status);
+      }
+    }
+
+    @Override
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+  }
+
+  private static final class MutableTypeCell<T> implements Cell<T> {
+    private final ReentrantLock lock = new ReentrantLock();
+
+    private final TypeSerializer<T> serializer;
+    private final ValueSpec<T> spec;
+    private TypedValue typedValue;
+    private CellStatus status = CellStatus.UNMODIFIED;
+
+    private MutableTypeCell(ValueSpec<T> spec, TypedValue typedValue) {
+      this.spec = spec;
+      this.typedValue = typedValue;
+      this.serializer = Objects.requireNonNull(spec.type().typeSerializer());
+    }
+
+    @Override
+    public Optional<T> get() {
+      lock.lock();
+      try {
+        if (status == CellStatus.DELETED) {
+          return Optional.empty();
+        }
+        return tryDeserialize(serializer, typedValue);
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void set(T value) {
+      if (value == null) {
+        throw new IllegalStorageAccessException(
+            spec.name(), "Can not set state to NULL. Please use remove() instead.");
+      }
+      lock.lock();
+      try {
+        final TypedValue newTypedValue =
+            this.typedValue
+                .toBuilder()
+                .setHasValue(true)
+                .setValue(serialize(serializer, value))
+                .build();
+        this.typedValue = newTypedValue;
+        this.status = CellStatus.MODIFIED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public void remove() {
+      lock.lock();
+      try {
+        status = CellStatus.DELETED;
+      } finally {
+        lock.unlock();
+      }
+    }
+
+    @Override
+    public Optional<PersistedValueMutation> toProtocolValueMutation() {
+      switch (status) {
+        case MODIFIED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.MODIFY)
+                  .setStateValue(typedValue)
+                  .build());
+        case DELETED:
+          return Optional.of(
+              PersistedValueMutation.newBuilder()
+                  .setStateName(spec.name())
+                  .setMutationType(PersistedValueMutation.MutationType.DELETE)
+                  .build());
+        case UNMODIFIED:
+          return Optional.empty();
+        default:
+          throw new IllegalStateException("Unknown cell status: " + status);
+      }
+    }
+
+    @Override
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+  }
+
+  private enum CellStatus {
+    UNMODIFIED,
+    MODIFIED,
+    DELETED
+  }
+
+  private static List<Cell<?>> createCells(List<StateValueContext<?>> stateValues) {
+    final List<Cell<?>> cells = new ArrayList<>(stateValues.size());
+
+    for (StateValueContext<?> stateValueContext : stateValues) {
+      final TypedValue typedValue = stateValueContext.protocolValue().getStateValue();
+      final ValueSpec<?> spec = stateValueContext.spec();
+      @SuppressWarnings({"unchecked", "rawtypes"})
+      final Cell<?> cell =
+          spec.type().typeCharacteristics().contains(TypeCharacteristics.IMMUTABLE_VALUES)
+              ? new ImmutableTypeCell(spec, typedValue)
+              : new MutableTypeCell(spec, typedValue);
+
+      cells.add(cell);
+    }
+
+    return cells;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
new file mode 100644
index 0000000..3a84840
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/IllegalStorageAccessException.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+public final class IllegalStorageAccessException extends RuntimeException {
+
+  private static final long serialVersionUID = 1;
+
+  protected IllegalStorageAccessException(String stateName, String message) {
+    super("Error accessing state " + stateName + "; " + message);
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
new file mode 100644
index 0000000..009c7aa
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/storage/StateValueContexts.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.annotations.Internal;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+
+/**
+ * Utility for pairing registered {@link ValueSpec}s with values provided by the protocol's {@link
+ * ToFunction} message.
+ */
+@Internal
+public final class StateValueContexts {
+
+  public static final class StateValueContext<T> {
+    private final ValueSpec<T> spec;
+    private final ToFunction.PersistedValue protocolValue;
+
+    StateValueContext(ValueSpec<T> spec, ToFunction.PersistedValue protocolValue) {
+      this.spec = Objects.requireNonNull(spec);
+      this.protocolValue = Objects.requireNonNull(protocolValue);
+    }
+
+    public ValueSpec<T> spec() {
+      return spec;
+    }
+
+    public ToFunction.PersistedValue protocolValue() {
+      return protocolValue;
+    }
+  }
+
+  public static final class ResolutionResult {
+    private final List<StateValueContext<?>> resolved;
+    private final List<ValueSpec<?>> missingValues;
+
+    private ResolutionResult(
+        List<StateValueContext<?>> resolved, List<ValueSpec<?>> missingValues) {
+      this.resolved = resolved;
+      this.missingValues = missingValues;
+    }
+
+    public boolean hasMissingValues() {
+      return missingValues != null;
+    }
+
+    public List<StateValueContext<?>> resolved() {
+      return resolved;
+    }
+
+    public List<ValueSpec<?>> missingValues() {
+      return missingValues;
+    }
+  }
+
+  /**
+   * Tries to resolve any registered states that are known to us by the {@link ValueSpec} with the
+   * states provided to us by the runtime.
+   */
+  public static ResolutionResult resolve(
+      Map<String, ValueSpec<?>> registeredSpecs,
+      List<ToFunction.PersistedValue> protocolProvidedValues) {
+
+    // holds a set of missing ValueSpec's. a missing ValueSpec is a value spec that was
+    // registered by the user but wasn't sent to the SDK by the runtime.
+    // this can happen upon an initial request.
+    List<ValueSpec<?>> statesWithMissingValue =
+        null; // optimize for normal execution, where states aren't missing.
+
+    // holds the StateValueContext that will be used to serialize and deserialize user state.
+    final List<StateValueContext<?>> resolvedStateValues = new ArrayList<>(registeredSpecs.size());
+
+    for (ValueSpec<?> spec : registeredSpecs.values()) {
+      ToFunction.PersistedValue persistedValue =
+          findPersistedValueByName(protocolProvidedValues, spec.name());
+
+      if (persistedValue != null) {
+        resolvedStateValues.add(new StateValueContext<>(spec, persistedValue));
+      } else {
+        // oh no. the runtime doesn't know (yet) about a state that was registered by the user.
+        // we need to collect these.
+        statesWithMissingValue =
+            (statesWithMissingValue != null)
+                ? statesWithMissingValue
+                : new ArrayList<>(registeredSpecs.size());
+        statesWithMissingValue.add(spec);
+      }
+    }
+
+    if (statesWithMissingValue == null) {
+      return new ResolutionResult(resolvedStateValues, null);
+    }
+    return new ResolutionResult(null, statesWithMissingValue);
+  }
+
+  /**
+   * finds a {@linkplain org.apache.flink.statefun.sdk.reqreply.generated.ToFunction.PersistedValue}
+   * with a given name. The size of the list, in practice is expected to be very small (0-10) items.
+   * just use a plain linear search.
+   */
+  private static ToFunction.PersistedValue findPersistedValueByName(
+      List<ToFunction.PersistedValue> protocolProvidedValues, String name) {
+    for (ToFunction.PersistedValue value : protocolProvidedValues) {
+      if (Objects.equals(value.getStateName(), name)) {
+        return value;
+      }
+    }
+    return null;
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
new file mode 100644
index 0000000..71d5488
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/SimpleType.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Objects;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+
+public final class SimpleType<T> implements Type<T> {
+
+  @FunctionalInterface
+  public interface Fn<I, O> {
+    O apply(I input) throws Throwable;
+  }
+
+  public static <T> Type<T> simpleTypeFrom(
+      TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+    return new SimpleType<>(typeName, serialize, deserialize, Collections.emptySet());
+  }
+
+  public static <T> Type<T> simpleImmutableTypeFrom(
+      TypeName typeName, Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+    return new SimpleType<>(
+        typeName, serialize, deserialize, EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+  }
+
+  private final TypeName typeName;
+  private final TypeSerializer<T> serializer;
+  private final Set<TypeCharacteristics> typeCharacteristics;
+
+  public SimpleType(
+      TypeName typeName,
+      Fn<T, byte[]> serialize,
+      Fn<byte[], T> deserialize,
+      Set<TypeCharacteristics> typeCharacteristics) {
+    this.typeName = Objects.requireNonNull(typeName);
+    this.serializer = new Serializer<>(serialize, deserialize);
+    this.typeCharacteristics = Collections.unmodifiableSet(EnumSet.copyOf(typeCharacteristics));
+  }
+
+  @Override
+  public TypeName typeName() {
+    return typeName;
+  }
+
+  @Override
+  public TypeSerializer<T> typeSerializer() {
+    return serializer;
+  }
+
+  @Override
+  public Set<TypeCharacteristics> typeCharacteristics() {
+    return typeCharacteristics;
+  }
+
+  private static final class Serializer<T> implements TypeSerializer<T> {
+    private final Fn<T, byte[]> serialize;
+    private final Fn<byte[], T> deserialize;
+
+    private Serializer(Fn<T, byte[]> serialize, Fn<byte[], T> deserialize) {
+      this.serialize = Objects.requireNonNull(serialize);
+      this.deserialize = Objects.requireNonNull(deserialize);
+    }
+
+    @Override
+    public Slice serialize(T value) {
+      try {
+        byte[] bytes = serialize.apply(value);
+        return Slices.wrap(bytes);
+      } catch (Throwable throwable) {
+        throw new IllegalStateException(throwable);
+      }
+    }
+
+    @Override
+    public T deserialize(Slice input) {
+      try {
+        byte[] bytes = input.toByteArray();
+        return deserialize.apply(bytes);
+      } catch (Throwable throwable) {
+        throw new IllegalStateException(throwable);
+      }
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
new file mode 100644
index 0000000..7343b5a
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Type.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import java.util.Collections;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+
+public interface Type<T> {
+
+  TypeName typeName();
+
+  TypeSerializer<T> typeSerializer();
+
+  default Set<TypeCharacteristics> typeCharacteristics() {
+    return Collections.emptySet();
+  }
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
new file mode 100644
index 0000000..c29bde2
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeCharacteristics.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+public enum TypeCharacteristics {
+  IMMUTABLE_VALUES
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
new file mode 100644
index 0000000..c60e802
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/TypeSerializer.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+
+public interface TypeSerializer<T> {
+
+  Slice serialize(T value);
+
+  T deserialize(Slice bytes);
+}
diff --git a/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
new file mode 100644
index 0000000..daa4fbf
--- /dev/null
+++ b/statefun-sdk-java/src/main/java/org/apache/flink/statefun/sdk/java/types/Types.java
@@ -0,0 +1,456 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.parseFrom;
+import static org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil.toSlice;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.google.protobuf.MoreByteStrings;
+import com.google.protobuf.WireFormat;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Set;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.DoubleWrapper;
+import org.apache.flink.statefun.sdk.types.generated.FloatWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+
+public final class Types {
+  private Types() {}
+
+  // primitives
+  public static final TypeName BOOLEAN_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/bool");
+  public static final TypeName INTEGER_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/int");
+  public static final TypeName FLOAT_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/float");
+  public static final TypeName LONG_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/long");
+  public static final TypeName DOUBLE_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/double");
+  public static final TypeName STRING_TYPENAME =
+      TypeName.typeNameFromString("io.statefun.types/string");
+
+  // common characteristics
+  private static final Set<TypeCharacteristics> IMMUTABLE_TYPE_CHARS =
+      Collections.unmodifiableSet(EnumSet.of(TypeCharacteristics.IMMUTABLE_VALUES));
+
+  public static Type<Long> longType() {
+    return LongType.INSTANCE;
+  }
+
+  public static Type<String> stringType() {
+    return StringType.INSTANCE;
+  }
+
+  public static Type<Integer> integerType() {
+    return IntegerType.INSTANCE;
+  }
+
+  public static Type<Boolean> booleanType() {
+    return BooleanType.INSTANCE;
+  }
+
+  public static Type<Float> floatType() {
+    return FloatType.INSTANCE;
+  }
+
+  public static Type<Double> doubleType() {
+    return DoubleType.INSTANCE;
+  }
+
+  /**
+   * Compute the Protobuf field tag, as specified by the Protobuf wire format. See {@code
+   * WireFormat#makeTag(int, int)}}. NOTE: that, currently, for all StateFun provided wire types the
+   * tags should be 1 byte.
+   *
+   * @param fieldNumber the field number as specified in the message definition.
+   * @param wireType the field type as specified in the message definition.
+   * @return the field tag as a single byte.
+   */
+  private static byte protobufTagAsSingleByte(int fieldNumber, int wireType) {
+    int fieldTag = fieldNumber << 3 | wireType;
+    if (fieldTag < -127 || fieldTag > 127) {
+      throw new IllegalStateException(
+          "Protobuf Wrapper type compatibility is bigger than one byte.");
+    }
+    return (byte) fieldTag;
+  }
+
+  private static final Slice EMPTY_SLICE = Slices.wrap(new byte[] {});
+
+  private static final class LongType implements Type<Long> {
+
+    static final Type<Long> INSTANCE = new LongType();
+
+    private final TypeSerializer<Long> serializer = new LongTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return LONG_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Long> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class LongTypeSerializer implements TypeSerializer<Long> {
+
+    @Override
+    public Slice serialize(Long element) {
+      return serializeLongWrapperCompatibleLong(element);
+    }
+
+    @Override
+    public Long deserialize(Slice input) {
+      return deserializeLongWrapperCompatibleLong(input);
+    }
+
+    private static final byte WRAPPER_TYPE_FIELD_TAG =
+        protobufTagAsSingleByte(LongWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED64);
+
+    private static Slice serializeLongWrapperCompatibleLong(long n) {
+      if (n == 0) {
+        return EMPTY_SLICE;
+      }
+      byte[] out = new byte[9];
+      out[0] = WRAPPER_TYPE_FIELD_TAG;
+      out[1] = (byte) (n & 0xFF);
+      out[2] = (byte) ((n >> 8) & 0xFF);
+      out[3] = (byte) ((n >> 16) & 0xFF);
+      out[4] = (byte) ((n >> 24) & 0xFF);
+      out[5] = (byte) ((n >> 32) & 0xFF);
+      out[6] = (byte) ((n >> 40) & 0xFF);
+      out[7] = (byte) ((n >> 48) & 0xFF);
+      out[8] = (byte) ((n >> 56) & 0xFF);
+      return Slices.wrap(out);
+    }
+
+    private static long deserializeLongWrapperCompatibleLong(Slice slice) {
+      if (slice.readableBytes() == 0) {
+        return 0;
+      }
+      if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TAG) {
+        throw new IllegalStateException("Not a LongWrapper");
+      }
+      return slice.byteAt(1) & 0xFFL
+          | (slice.byteAt(2) & 0xFFL) << 8
+          | (slice.byteAt(3) & 0xFFL) << 16
+          | (slice.byteAt(4) & 0xFFL) << 24
+          | (slice.byteAt(5) & 0xFFL) << 32
+          | (slice.byteAt(6) & 0xFFL) << 40
+          | (slice.byteAt(7) & 0xFFL) << 48
+          | (slice.byteAt(8) & 0xFFL) << 56;
+    }
+  }
+
+  private static final class StringType implements Type<String> {
+
+    static final Type<String> INSTANCE = new StringType();
+
+    private final TypeSerializer<String> serializer = new StringTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return STRING_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<String> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class StringTypeSerializer implements TypeSerializer<String> {
+
+    private static final byte STRING_WRAPPER_FIELD_TYPE =
+        protobufTagAsSingleByte(
+            StringWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_LENGTH_DELIMITED);
+
+    @Override
+    public Slice serialize(String element) {
+      return serializeStringWrapperCompatibleString(element);
+    }
+
+    @Override
+    public String deserialize(Slice input) {
+      return deserializeStringWrapperCompatibleString(input);
+    }
+
+    private static Slice serializeStringWrapperCompatibleString(String element) {
+      if (element.isEmpty()) {
+        return EMPTY_SLICE;
+      }
+      ByteString utf8 = ByteString.copyFromUtf8(element);
+      byte[] header = new byte[1 + 5 + utf8.size()];
+      int position = 0;
+      // write the field tag
+      header[position++] = STRING_WRAPPER_FIELD_TYPE;
+      // write utf8 bytes.length as a VarInt.
+      int varIntLen = utf8.size();
+      while ((varIntLen & -128) != 0) {
+        header[position++] = ((byte) (varIntLen & 127 | 128));
+        varIntLen >>>= 7;
+      }
+      header[position++] = ((byte) varIntLen);
+      // concat header and the utf8 string bytes
+      ByteString headerBuf = MoreByteStrings.wrap(header, 0, position);
+      ByteString result = MoreByteStrings.concat(headerBuf, utf8);
+      return SliceProtobufUtil.asSlice(result);
+    }
+
+    private static String deserializeStringWrapperCompatibleString(Slice input) {
+      if (input.readableBytes() == 0) {
+        return "";
+      }
+      ByteString buf = SliceProtobufUtil.asByteString(input);
+      int position = 0;
+      // read field tag
+      if (buf.byteAt(position++) != STRING_WRAPPER_FIELD_TYPE) {
+        throw new IllegalStateException("Not a StringWrapper");
+      }
+      // read VarInt32 length
+      int shift = 0;
+      long varIntSize = 0;
+      while (shift < 32) {
+        final byte b = buf.byteAt(position++);
+        varIntSize |= (long) (b & 0x7F) << shift;
+        if ((b & 0x80) == 0) {
+          break;
+        }
+        shift += 7;
+      }
+      // sanity checks
+      if (varIntSize < 0 || varIntSize > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Malformed VarInt");
+      }
+      final int endIndex = position + (int) varIntSize;
+      ByteString utf8Bytes = buf.substring(position, endIndex);
+      return utf8Bytes.toStringUtf8();
+    }
+  }
+
+  private static final class IntegerType implements Type<Integer> {
+
+    static final Type<Integer> INSTANCE = new IntegerType();
+
+    private final TypeSerializer<Integer> serializer = new IntegerTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return INTEGER_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Integer> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class IntegerTypeSerializer implements TypeSerializer<Integer> {
+    private static final byte WRAPPER_TYPE_FIELD_TYPE =
+        protobufTagAsSingleByte(IntWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_FIXED32);
+
+    @Override
+    public Slice serialize(Integer element) {
+      return serializeIntegerWrapperCompatibleInt(element);
+    }
+
+    @Override
+    public Integer deserialize(Slice input) {
+      return deserializeIntegerWrapperCompatibleInt(input);
+    }
+
+    private static Slice serializeIntegerWrapperCompatibleInt(int n) {
+      if (n == 0) {
+        return EMPTY_SLICE;
+      }
+      byte[] out = new byte[5];
+      out[0] = WRAPPER_TYPE_FIELD_TYPE;
+      out[1] = (byte) (n & 0xFF);
+      out[2] = (byte) ((n >> 8) & 0xFF);
+      out[3] = (byte) ((n >> 16) & 0xFF);
+      out[4] = (byte) ((n >> 24) & 0xFF);
+      return Slices.wrap(out);
+    }
+
+    private static int deserializeIntegerWrapperCompatibleInt(Slice slice) {
+      if (slice.readableBytes() == 0) {
+        return 0;
+      }
+      if (slice.byteAt(0) != WRAPPER_TYPE_FIELD_TYPE) {
+        throw new IllegalStateException("Not an IntWrapper");
+      }
+      return slice.byteAt(1) & 0xFF
+          | (slice.byteAt(2) & 0xFF) << 8
+          | (slice.byteAt(3) & 0xFF) << 16
+          | (slice.byteAt(4) & 0xFF) << 24;
+    }
+  }
+
+  private static final class BooleanType implements Type<Boolean> {
+
+    static final Type<Boolean> INSTANCE = new BooleanType();
+
+    private final TypeSerializer<Boolean> serializer = new BooleanTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return BOOLEAN_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Boolean> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class BooleanTypeSerializer implements TypeSerializer<Boolean> {
+    private static final Slice TRUE_SLICE =
+        toSlice(BooleanWrapper.newBuilder().setValue(true).build());
+    private static final Slice FALSE_SLICE =
+        toSlice(BooleanWrapper.newBuilder().setValue(false).build());
+    private static final byte WRAPPER_TYPE_TAG =
+        protobufTagAsSingleByte(BooleanWrapper.VALUE_FIELD_NUMBER, WireFormat.WIRETYPE_VARINT);
+
+    @Override
+    public Slice serialize(Boolean element) {
+      return element ? TRUE_SLICE : FALSE_SLICE;
+    }
+
+    @Override
+    public Boolean deserialize(Slice input) {
+      if (input.readableBytes() == 0) {
+        return false;
+      }
+      final byte tag = input.byteAt(0);
+      final byte value = input.byteAt(1);
+      if (tag != WRAPPER_TYPE_TAG || value != (byte) 1) {
+        throw new IllegalStateException("Not a BooleanWrapper value.");
+      }
+      return true;
+    }
+  }
+
+  private static final class FloatType implements Type<Float> {
+
+    static final Type<Float> INSTANCE = new FloatType();
+
+    private final TypeSerializer<Float> serializer = new FloatTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return FLOAT_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Float> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class FloatTypeSerializer implements TypeSerializer<Float> {
+
+    @Override
+    public Slice serialize(Float element) {
+      FloatWrapper wrapper = FloatWrapper.newBuilder().setValue(element).build();
+      return toSlice(wrapper);
+    }
+
+    @Override
+    public Float deserialize(Slice input) {
+      try {
+        FloatWrapper wrapper = parseFrom(FloatWrapper.parser(), input);
+        return wrapper.getValue();
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+  }
+
+  private static final class DoubleType implements Type<Double> {
+
+    static final Type<Double> INSTANCE = new DoubleType();
+
+    private final TypeSerializer<Double> serializer = new DoubleTypeSerializer();
+
+    @Override
+    public TypeName typeName() {
+      return DOUBLE_TYPENAME;
+    }
+
+    @Override
+    public TypeSerializer<Double> typeSerializer() {
+      return serializer;
+    }
+
+    public Set<TypeCharacteristics> typeCharacteristics() {
+      return IMMUTABLE_TYPE_CHARS;
+    }
+  }
+
+  private static final class DoubleTypeSerializer implements TypeSerializer<Double> {
+
+    @Override
+    public Slice serialize(Double element) {
+      DoubleWrapper wrapper = DoubleWrapper.newBuilder().setValue(element).build();
+      return toSlice(wrapper);
+    }
+
+    @Override
+    public Double deserialize(Slice input) {
+      try {
+        DoubleWrapper wrapper = parseFrom(DoubleWrapper.parser(), input);
+        return wrapper.getValue();
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalArgumentException(e);
+      }
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
new file mode 100644
index 0000000..4c0a116
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/ConcurrentRequestReplyHandlerTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static java.util.Collections.singletonMap;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.modifiedValue;
+import static org.apache.flink.statefun.sdk.java.handler.TestUtils.protoFromValueSpec;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import org.apache.flink.statefun.sdk.java.Context;
+import org.apache.flink.statefun.sdk.java.StatefulFunction;
+import org.apache.flink.statefun.sdk.java.StatefulFunctionSpec;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.handler.TestUtils.RequestBuilder;
+import org.apache.flink.statefun.sdk.java.message.Message;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.junit.Test;
+
+public class ConcurrentRequestReplyHandlerTest {
+
+  private static final TypeName GREETER_TYPE = TypeName.typeNameFromString("example/greeter");
+
+  private static final ValueSpec<Integer> SEEN_INT_SPEC =
+      ValueSpec.named("seen").thatExpiresAfterReadOrWrite(Duration.ofDays(1)).withIntType();
+
+  private static final StatefulFunctionSpec GREET_FN_SPEC =
+      StatefulFunctionSpec.builder(GREETER_TYPE)
+          .withValueSpec(SEEN_INT_SPEC)
+          .withSupplier(SimpleGreeter::new)
+          .build();
+
+  private final ConcurrentRequestReplyHandler handlerUnderTest =
+      new ConcurrentRequestReplyHandler(singletonMap(GREETER_TYPE, GREET_FN_SPEC));
+
+  @Test
+  public void simpleInvocationExample() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withState(SEEN_INT_SPEC, 1023)
+            .withInvocation(Types.stringType(), "Hello world")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(response, notNullValue());
+  }
+
+  @Test
+  public void invocationWithoutStateDefinition() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withInvocation(Types.stringType(), "Hello world")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(
+        response.getIncompleteInvocationContext().getMissingValuesList(),
+        hasItem(protoFromValueSpec(SEEN_INT_SPEC)));
+  }
+
+  @Test
+  public void multipleInvocations() {
+    ToFunction request =
+        new RequestBuilder()
+            .withTarget(GREETER_TYPE, "0")
+            .withState(SEEN_INT_SPEC, 0)
+            .withInvocation(Types.stringType(), "a")
+            .withInvocation(Types.stringType(), "b")
+            .withInvocation(Types.stringType(), "c")
+            .build();
+
+    FromFunction response = handlerUnderTest.handleInternally(request).join();
+
+    assertThat(
+        response.getInvocationResult().getStateMutationsList(),
+        hasItem(modifiedValue(SEEN_INT_SPEC, 3)));
+  }
+
+  private static final class SimpleGreeter implements StatefulFunction {
+
+    @Override
+    public CompletableFuture<Void> apply(Context context, Message argument) {
+      int seen = context.storage().get(SEEN_INT_SPEC).orElse(0);
+      System.out.println("Hello there " + argument.asUtf8String() + " state=" + seen);
+
+      context.storage().set(SEEN_INT_SPEC, seen + 1);
+
+      return CompletableFuture.completedFuture(null);
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
new file mode 100644
index 0000000..d861d5a
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/MoreFuturesTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import static org.apache.flink.statefun.sdk.java.handler.MoreFutures.applySequentially;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.stream.IntStream;
+import org.junit.Test;
+
+public class MoreFuturesTest {
+
+  @Test
+  public void usageExample() {
+    final ConcurrentLinkedDeque<String> results = new ConcurrentLinkedDeque<>();
+
+    CompletableFuture<?> allDone =
+        applySequentially(
+            Arrays.asList("a", "b", "c"),
+            letter -> CompletableFuture.runAsync(() -> results.addLast(letter)));
+
+    allDone.join();
+
+    assertThat(results, contains("a", "b", "c"));
+  }
+
+  @Test
+  public void firstThrows() {
+    CompletableFuture<Void> allDone =
+        applySequentially(Arrays.asList("a", "b", "c"), throwing("a"));
+
+    assertThat(allDone.isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void lastThrows() {
+    CompletableFuture<Void> allDone =
+        applySequentially(Arrays.asList("a", "b", "c"), throwing("c"));
+
+    assertThat(allDone.isCompletedExceptionally(), is(true));
+  }
+
+  @Test
+  public void bigList() {
+    Iterator<String> lotsOfInts =
+        IntStream.range(0, 1_000_000).mapToObj(String::valueOf).iterator();
+    Iterable<String> input = () -> lotsOfInts;
+
+    CompletableFuture<Void> allDone = applySequentially(input, throwing(null));
+
+    allDone.join();
+  }
+
+  private static MoreFutures.Fn<String, CompletableFuture<Void>> throwing(String when) {
+    return letter -> {
+      if (letter.equals(when)) {
+        throw new IllegalStateException("I'm a throwing function");
+      } else {
+        return CompletableFuture.completedFuture(null);
+        //        return CompletableFuture.runAsync(
+        //            () -> {
+        ////              try {
+        ////                Thread.sleep(1_00);
+        ////              } catch (InterruptedException e) {
+        ////                e.printStackTrace();
+        ////              }
+        //              /* complete successfully */
+        //            });
+      }
+    };
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
new file mode 100644
index 0000000..43cb4f1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/handler/TestUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.handler;
+
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.SliceProtobufUtil;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.Address;
+import org.apache.flink.statefun.sdk.reqreply.generated.FromFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+
+public class TestUtils {
+  private TestUtils() {}
+
+  public static <T> Matcher<FromFunction.PersistedValueMutation> modifiedValue(
+      ValueSpec<T> spec, T newValue) {
+    FromFunction.PersistedValueMutation mutation =
+        FromFunction.PersistedValueMutation.newBuilder()
+            .setStateName(spec.name())
+            .setMutationType(FromFunction.PersistedValueMutation.MutationType.MODIFY)
+            .setStateValue(typedValue(spec.type(), newValue))
+            .build();
+
+    return Matchers.is(mutation);
+  }
+
+  public static FromFunction.PersistedValueSpec protoFromValueSpec(ValueSpec<?> spec) {
+    return ProtoUtils.protoFromValueSpec(spec).build();
+  }
+
+  public static <T> TypedValue.Builder typedValue(Type<T> type, T value) {
+    Slice serializedValue = type.typeSerializer().serialize(value);
+    return TypedValue.newBuilder()
+        .setTypename(type.typeName().asTypeNameString())
+        .setHasValue(value != null)
+        .setValue(SliceProtobufUtil.asByteString(serializedValue));
+  }
+
+  /** A test utility to build ToFunction messages using SDK concepts. */
+  public static final class RequestBuilder {
+
+    private final ToFunction.InvocationBatchRequest.Builder builder =
+        ToFunction.InvocationBatchRequest.newBuilder();
+
+    public RequestBuilder withTarget(TypeName target, String id) {
+      builder.setTarget(
+          Address.newBuilder().setNamespace(target.namespace()).setType(target.name()).setId(id));
+      return this;
+    }
+
+    public <T> RequestBuilder withState(ValueSpec<T> spec) {
+      builder.addState(ToFunction.PersistedValue.newBuilder().setStateName(spec.name()));
+
+      return this;
+    }
+
+    public <T> RequestBuilder withState(ValueSpec<T> spec, T value) {
+      builder.addState(
+          ToFunction.PersistedValue.newBuilder()
+              .setStateName(spec.name())
+              .setStateValue(typedValue(spec.type(), value)));
+
+      return this;
+    }
+
+    public <T> RequestBuilder withInvocation(Type<T> type, T value) {
+      builder.addInvocations(
+          ToFunction.Invocation.newBuilder().setArgument(typedValue(type, value)));
+      return this;
+    }
+
+    public ToFunction build() {
+      return ToFunction.newBuilder().setInvocation(builder).build();
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
new file mode 100644
index 0000000..f0ba3c4
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceOutputTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertArrayEquals;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.junit.Test;
+
+public class SliceOutputTest {
+
+  private static final byte[] TEST_BYTES = new byte[] {1, 2, 3, 4, 5};
+
+  @Test
+  public void usageExample() {
+    SliceOutput output = SliceOutput.sliceOutput(32);
+
+    output.write(TEST_BYTES);
+    Slice slice = output.copyOf();
+
+    assertArrayEquals(TEST_BYTES, slice.toByteArray());
+  }
+
+  @Test
+  public void sliceShouldAutoGrow() {
+    SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+
+    sliceOutput.write(TEST_BYTES);
+    Slice slice = sliceOutput.copyOf();
+    byte[] got = new byte[slice.readableBytes()];
+    slice.copyTo(got);
+
+    assertArrayEquals(got, TEST_BYTES);
+  }
+
+  @Test
+  public void writeStartingAtOffset() {
+    SliceOutput output = SliceOutput.sliceOutput(32);
+
+    output.write(TEST_BYTES);
+    Slice slice = output.copyOf();
+
+    byte[] slightlyBiggerBuf = new byte[1 + slice.readableBytes()];
+    slice.copyTo(slightlyBiggerBuf, 1);
+
+    byte[] got = deleteFirstByte(slightlyBiggerBuf);
+    assertArrayEquals(TEST_BYTES, got);
+  }
+
+  @Test
+  public void randomizedTest() {
+    ThreadLocalRandom random = ThreadLocalRandom.current();
+    for (int i = 0; i < 1_000_000; i++) {
+      // create a random buffer of a random size.
+      final int size = random.nextInt(0, 32);
+      byte[] buf = randomBuffer(random, size);
+      // write the buffer in random chunks
+      SliceOutput sliceOutput = SliceOutput.sliceOutput(0);
+      for (OffsetLenPair chunk : randomChunks(random, size)) {
+        sliceOutput.write(buf, chunk.offset, chunk.len);
+      }
+      // verify
+      Slice slice = sliceOutput.copyOf();
+      assertArrayEquals(buf, slice.toByteArray());
+    }
+  }
+
+  @Test
+  public void copyFromInputStreamRandomizedTest() {
+    byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+    ByteArrayInputStream input = new ByteArrayInputStream(expected);
+
+    Slice slice = Slices.copyOf(input, 0);
+
+    byte[] actual = slice.toByteArray();
+
+    assertArrayEquals(expected, actual);
+  }
+
+  @Test
+  public void toOutputStreamTest() {
+    byte[] expected = randomBuffer(ThreadLocalRandom.current(), 256);
+
+    Slice slice = Slices.wrap(expected);
+
+    ByteArrayOutputStream output = new ByteArrayOutputStream(expected.length);
+    slice.copyTo(output);
+
+    assertArrayEquals(expected, output.toByteArray());
+  }
+
+  private static byte[] deleteFirstByte(byte[] slightlyBiggerBuf) {
+    return Arrays.copyOfRange(slightlyBiggerBuf, 1, slightlyBiggerBuf.length);
+  }
+
+  private static byte[] randomBuffer(ThreadLocalRandom random, int size) {
+    byte[] buf = new byte[size];
+    random.nextBytes(buf);
+    return buf;
+  }
+
+  /** Compute a random partition of @size to pairs of (offset, len). */
+  private static List<OffsetLenPair> randomChunks(ThreadLocalRandom random, int size) {
+    ArrayList<OffsetLenPair> chunks = new ArrayList<>();
+    int offset = 0;
+    while (offset != size) {
+      int remaining = size - offset;
+      int toWrite = random.nextInt(remaining + 1);
+      chunks.add(new OffsetLenPair(offset, toWrite));
+      offset += toWrite;
+    }
+    return chunks;
+  }
+
+  private static final class OffsetLenPair {
+    final int offset;
+    final int len;
+
+    public OffsetLenPair(int offset, int len) {
+      this.offset = offset;
+      this.len = len;
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
new file mode 100644
index 0000000..85f21c2
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/slice/SliceProtobufUtilTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.slice;
+
+import static org.junit.Assert.assertSame;
+
+import com.google.protobuf.ByteString;
+import org.junit.Test;
+
+public class SliceProtobufUtilTest {
+
+  @Test
+  public void usageExample() {
+    ByteString expected = ByteString.copyFromUtf8("Hello world");
+
+    Slice slice = SliceProtobufUtil.asSlice(expected);
+    ByteString got = SliceProtobufUtil.asByteString(slice);
+
+    assertSame("Expecting the same reference.", expected, got);
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
new file mode 100644
index 0000000..0afaa02
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/ConcurrentAddressScopedStorageTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+import com.google.protobuf.ByteString;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import org.apache.flink.statefun.sdk.java.AddressScopedStorage;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.junit.Test;
+
+public class ConcurrentAddressScopedStorageTest {
+
+  @Test
+  public void exampleUsage() {
+    final ValueSpec<Integer> stateSpec1 = ValueSpec.named("state-1").withIntType();
+    final ValueSpec<Boolean> stateSpec2 = ValueSpec.named("state-2").withBooleanType();
+
+    final List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec1, 91), stateValue(stateSpec2, true));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    assertThat(storage.get(stateSpec1), is(Optional.of(91)));
+    assertThat(storage.get(stateSpec2), is(Optional.of(true)));
+  }
+
+  @Test
+  public void getNullValueCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, null));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test
+  public void setCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(stateSpec, 1111);
+
+    assertThat(storage.get(stateSpec), is(Optional.of(1111)));
+  }
+
+  @Test
+  public void setMutableTypeCell() {
+    final ValueSpec<TestMutableType.Type> stateSpec =
+        ValueSpec.named("state").withCustomType(new TestMutableType());
+
+    final List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+    final TestMutableType.Type newValue = new TestMutableType.Type("hello again!");
+    storage.set(stateSpec, newValue);
+
+    // mutations after a set should not have any effect
+    newValue.mutate("this value should not be written to storage!");
+    assertThat(storage.get(stateSpec), is(Optional.of(new TestMutableType.Type("hello again!"))));
+  }
+
+  @Test
+  public void clearCell() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(stateSpec);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test
+  public void clearMutableTypeCell() {
+    final ValueSpec<TestMutableType.Type> stateSpec =
+        ValueSpec.named("state").withCustomType(new TestMutableType());
+
+    List<StateValueContext<?>> testStateValues =
+        testStateValues(stateValue(stateSpec, new TestMutableType.Type("hello")));
+
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(stateSpec);
+
+    assertThat(storage.get(stateSpec), is(Optional.empty()));
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void getNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.get(ValueSpec.named("doesn't-exist").withIntType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.set(ValueSpec.named("doesn't-exist").withIntType(), 999);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void clearNonExistingCell() {
+    final AddressScopedStorage storage =
+        new ConcurrentAddressScopedStorage(Collections.emptyList());
+
+    storage.remove(ValueSpec.named("doesn't-exist").withIntType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setToNull() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(stateSpec, null);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void getWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.get(ValueSpec.named("state").withBooleanType());
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void setWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.set(ValueSpec.named("state").withBooleanType(), true);
+  }
+
+  @Test(expected = IllegalStorageAccessException.class)
+  public void clearWithWrongType() {
+    final ValueSpec<Integer> stateSpec = ValueSpec.named("state").withIntType();
+
+    final List<StateValueContext<?>> testStateValues = testStateValues(stateValue(stateSpec, 91));
+    final AddressScopedStorage storage = new ConcurrentAddressScopedStorage(testStateValues);
+
+    storage.remove(ValueSpec.named("state").withBooleanType());
+  }
+
+  private static List<StateValueContext<?>> testStateValues(StateValueContext<?>... testValues) {
+    return Arrays.asList(testValues);
+  }
+
+  private static <T> StateValueContext<T> stateValue(ValueSpec<T> spec, T value) {
+    final ToFunction.PersistedValue protocolValue =
+        ToFunction.PersistedValue.newBuilder()
+            .setStateName(spec.name())
+            .setStateValue(
+                TypedValue.newBuilder()
+                    .setTypename(spec.type().typeName().asTypeNameString())
+                    .setHasValue(value != null)
+                    .setValue(toByteString(spec.type(), value)))
+            .build();
+
+    return new StateValueContext<>(spec, protocolValue);
+  }
+
+  private static <T> ByteString toByteString(Type<T> type, T value) {
+    if (value == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
new file mode 100644
index 0000000..a9c3902
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/StateValueContextsTest.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import static org.apache.flink.statefun.sdk.java.storage.StateValueContexts.StateValueContext;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsCollectionContaining.hasItem;
+
+import com.google.protobuf.ByteString;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.ValueSpec;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.Types;
+import org.apache.flink.statefun.sdk.reqreply.generated.ToFunction;
+import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.Matchers;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.hamcrest.TypeSafeMatcher;
+import org.junit.Test;
+
+public final class StateValueContextsTest {
+
+  @Test
+  public void exampleUsage() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(2);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+    registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(2);
+    providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+    final List<StateValueContext<?>> resolvedStateValues =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+    assertThat(resolvedStateValues.size(), is(2));
+    assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-1")));
+    assertThat(resolvedStateValues, hasItem(stateValueContextNamed("state-2")));
+  }
+
+  @Test
+  public void missingProtocolValues() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(3);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+    registeredSpecs.put("state-2", ValueSpec.named("state-2").withBooleanType());
+    registeredSpecs.put("state-3", ValueSpec.named("state-3").withUtf8String());
+
+    // only value for state-2 was provided
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(1);
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+
+    final List<ValueSpec<?>> statesWithMissingValue =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).missingValues();
+
+    assertThat(statesWithMissingValue.size(), is(2));
+    assertThat(statesWithMissingValue, hasItem(valueSpec("state-1", Types.integerType())));
+    assertThat(statesWithMissingValue, hasItem(valueSpec("state-3", Types.stringType())));
+  }
+
+  @Test
+  public void extraProtocolValues() {
+    final Map<String, ValueSpec<?>> registeredSpecs = new HashMap<>(1);
+    registeredSpecs.put("state-1", ValueSpec.named("state-1").withIntType());
+
+    // a few extra states were provided, and should be ignored
+    final List<ToFunction.PersistedValue> providedProtocolValues = new ArrayList<>(3);
+    providedProtocolValues.add(protocolValue("state-1", Types.integerType(), 66));
+    providedProtocolValues.add(protocolValue("state-2", Types.booleanType(), true));
+    providedProtocolValues.add(protocolValue("state-3", Types.stringType(), "ignore me!"));
+
+    final List<StateValueContext<?>> resolvedStateValues =
+        StateValueContexts.resolve(registeredSpecs, providedProtocolValues).resolved();
+
+    assertThat(resolvedStateValues.size(), is(1));
+    ValueSpec<?> spec = resolvedStateValues.get(0).spec();
+    assertThat(spec.name(), Matchers.is("state-1"));
+  }
+
+  private static <T> ToFunction.PersistedValue protocolValue(
+      String stateName, Type<T> type, T value) {
+    return ToFunction.PersistedValue.newBuilder()
+        .setStateName(stateName)
+        .setStateValue(
+            TypedValue.newBuilder()
+                .setTypename(type.typeName().asTypeNameString())
+                .setHasValue(value != null)
+                .setValue(toByteString(type, value)))
+        .build();
+  }
+
+  private static <T> ByteString toByteString(Type<T> type, T value) {
+    if (value == null) {
+      return ByteString.EMPTY;
+    }
+    return ByteString.copyFrom(type.typeSerializer().serialize(value).toByteArray());
+  }
+
+  private static <T> Matcher<ValueSpec<T>> valueSpec(String stateName, Type<T> type) {
+    return new TypeSafeMatcher<ValueSpec<T>>() {
+      @Override
+      protected boolean matchesSafely(ValueSpec<T> testSpec) {
+        return testSpec.type().getClass() == type.getClass() && testSpec.name().equals(stateName);
+      }
+
+      @Override
+      public void describeTo(Description description) {}
+    };
+  }
+
+  private static <T> Matcher<StateValueContext<T>> stateValueContextNamed(String name) {
+    return new TypeSafeDiagnosingMatcher<StateValueContext<T>>() {
+      @Override
+      protected boolean matchesSafely(StateValueContext<T> ctx, Description description) {
+        if (!Objects.equals(ctx.spec().name(), name)) {
+          description.appendText(ctx.spec().name());
+          return false;
+        }
+        return true;
+      }
+
+      @Override
+      public void describeTo(Description description) {
+        description.appendText("A StateValueContext named ").appendText(name);
+      }
+    };
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
new file mode 100644
index 0000000..6c31b00
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/storage/TestMutableType.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.sdk.java.storage;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import org.apache.flink.statefun.sdk.java.TypeName;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.java.types.Type;
+import org.apache.flink.statefun.sdk.java.types.TypeSerializer;
+
+public class TestMutableType implements Type<TestMutableType.Type> {
+
+  @Override
+  public TypeName typeName() {
+    return TypeName.typeNameOf("test", "my-mutable-type");
+  }
+
+  @Override
+  public TypeSerializer<TestMutableType.Type> typeSerializer() {
+    return new Serializer();
+  }
+
+  public static class Type {
+    private String value;
+
+    public Type(String value) {
+      this.value = value;
+    }
+
+    public void mutate(String newValue) {
+      this.value = newValue;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+      Type type = (Type) o;
+      return Objects.equals(value, type.value);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(value);
+    }
+  }
+
+  private static class Serializer implements TypeSerializer<TestMutableType.Type> {
+    @Override
+    public Slice serialize(Type value) {
+      return Slices.wrap(value.value.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @Override
+    public Type deserialize(Slice bytes) {
+      return new Type(new String(bytes.toByteArray(), StandardCharsets.UTF_8));
+    }
+  }
+}
diff --git a/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
new file mode 100644
index 0000000..90f4ca1
--- /dev/null
+++ b/statefun-sdk-java/src/test/java/org/apache/flink/statefun/sdk/java/types/SanityPrimitiveTypeTest.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.statefun.sdk.java.types;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.flink.statefun.sdk.java.slice.Slice;
+import org.apache.flink.statefun.sdk.java.slice.Slices;
+import org.apache.flink.statefun.sdk.types.generated.BooleanWrapper;
+import org.apache.flink.statefun.sdk.types.generated.IntWrapper;
+import org.apache.flink.statefun.sdk.types.generated.LongWrapper;
+import org.apache.flink.statefun.sdk.types.generated.StringWrapper;
+import org.junit.Ignore;
+import org.junit.Test;
+
+public class SanityPrimitiveTypeTest {
+
+  @Test
+  public void testBoolean() {
+    assertRoundTrip(Types.booleanType(), Boolean.TRUE);
+    assertRoundTrip(Types.booleanType(), Boolean.FALSE);
+  }
+
+  @Test
+  public void testInt() {
+    assertRoundTrip(Types.integerType(), 1);
+    assertRoundTrip(Types.integerType(), 1048576);
+    assertRoundTrip(Types.integerType(), Integer.MIN_VALUE);
+    assertRoundTrip(Types.integerType(), Integer.MAX_VALUE);
+    assertRoundTrip(Types.integerType(), -1);
+  }
+
+  @Test
+  public void testLong() {
+    assertRoundTrip(Types.longType(), -1L);
+    assertRoundTrip(Types.longType(), 0L);
+    assertRoundTrip(Types.longType(), Long.MIN_VALUE);
+    assertRoundTrip(Types.longType(), Long.MAX_VALUE);
+  }
+
+  @Test
+  public void testFloat() {
+    assertRoundTrip(Types.floatType(), Float.MIN_VALUE);
+    assertRoundTrip(Types.floatType(), Float.MAX_VALUE);
+    assertRoundTrip(Types.floatType(), 2.1459f);
+    assertRoundTrip(Types.floatType(), -1e-4f);
+  }
+
+  @Test
+  public void testDouble() {
+    assertRoundTrip(Types.doubleType(), Double.MIN_VALUE);
+    assertRoundTrip(Types.doubleType(), Double.MAX_VALUE);
+    assertRoundTrip(Types.doubleType(), 2.1459d);
+    assertRoundTrip(Types.doubleType(), -1e-4d);
+  }
+
+  @Test
+  public void testString() {
+    assertRoundTrip(Types.stringType(), "");
+    assertRoundTrip(Types.stringType(), "This is a string");
+  }
+
+  @Test
+  public void testRandomCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+    for (int i = 0; i < 1_000_000; i++) {
+      testCompatibilityWithWrapper(
+          serializer, IntWrapper::parseFrom, IntWrapper::getValue, IntWrapper::toByteArray, i);
+    }
+  }
+
+  @Test
+  public void testCompatibilityWithABooleanWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Boolean> serializer = Types.booleanType().typeSerializer();
+    testCompatibilityWithWrapper(
+        serializer,
+        BooleanWrapper::parseFrom,
+        BooleanWrapper::getValue,
+        BooleanWrapper::toByteArray,
+        true);
+
+    testCompatibilityWithWrapper(
+        serializer,
+        BooleanWrapper::parseFrom,
+        BooleanWrapper::getValue,
+        BooleanWrapper::toByteArray,
+        false);
+  }
+
+  @Test
+  public void testRandomCompatibilityWithALongWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Long> serializer = Types.longType().typeSerializer();
+    for (long i = 0; i < 1_000_000; i++) {
+      testCompatibilityWithWrapper(
+          serializer, LongWrapper::parseFrom, LongWrapper::getValue, LongWrapper::toByteArray, i);
+    }
+  }
+
+  @Ignore
+  @Test
+  public void testCompatibilityWithAnIntegerWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<Integer> serializer = Types.integerType().typeSerializer();
+    for (int expected = Integer.MIN_VALUE; expected != Integer.MAX_VALUE; expected++) {
+      testCompatibilityWithWrapper(
+          serializer,
+          IntWrapper::parseFrom,
+          IntWrapper::getValue,
+          IntWrapper::toByteArray,
+          expected);
+    }
+  }
+
+  @Test
+  public void testRandomCompatibilityWithStringWrapper() throws InvalidProtocolBufferException {
+    TypeSerializer<String> serializer = Types.stringType().typeSerializer();
+    ThreadLocalRandom random = ThreadLocalRandom.current();
+    for (int i = 0; i < 1_000; i++) {
+      int n = random.nextInt(4096);
+      byte[] buf = new byte[n];
+      random.nextBytes(buf);
+      String expected = new String(buf, StandardCharsets.UTF_8);
+
+      testCompatibilityWithWrapper(
+          serializer,
+          StringWrapper::parseFrom,
+          StringWrapper::getValue,
+          StringWrapper::toByteArray,
+          expected);
+    }
+  }
+
+  @FunctionalInterface
+  interface Fn<I, O> {
+    O apply(I input) throws InvalidProtocolBufferException;
+  }
+
+  private static <T, W> void testCompatibilityWithWrapper(
+      TypeSerializer<T> serializer,
+      Fn<ByteBuffer, W> parseFrom,
+      Fn<W, T> getValue,
+      Fn<W, byte[]> toByteArray,
+      T expected)
+      throws InvalidProtocolBufferException {
+    //
+    // test round trip with ourself.
+    //
+    final Slice serialized = serializer.serialize(expected);
+    final T got = serializer.deserialize(serialized);
+    assertEquals(expected, got);
+    //
+    // test that protobuf can parse what we wrote:
+    //
+    final W wrapper = parseFrom.apply(serialized.asReadOnlyByteBuffer());
+    assertEquals(expected, getValue.apply(wrapper));
+    //
+    // test that we can parse what protobuf wrote:
+    //
+    final Slice serializedByPb = Slices.wrap(toByteArray.apply(wrapper));
+    final T gotPb = serializer.deserialize(serializedByPb);
+    assertEquals(gotPb, expected);
+    // test that pb byte representation is equal to ours:
+    assertEquals(serializedByPb.asReadOnlyByteBuffer(), serialized.asReadOnlyByteBuffer());
+  }
+
+  public <T> void assertRoundTrip(Type<T> type, T element) {
+    final Slice slice;
+    {
+      TypeSerializer<T> serializer = type.typeSerializer();
+      slice = serializer.serialize(element);
+    }
+    TypeSerializer<T> serializer = type.typeSerializer();
+    T got = serializer.deserialize(slice);
+    assertEquals(element, got);
+  }
+}