You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2021/12/27 15:23:18 UTC

[flink-statefun] branch master updated (9989275 -> fe3c266)

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git.


    from 9989275  [hotfix] Add k8s HA to helm chart
     new 51d3130  [FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder
     new fe3c266  [hotfix] Move junit and hamcrest-all test dependency to dependencyManagement

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 pom.xml                                            |  17 ++++
 .../statefun-e2e-tests-common/pom.xml              |   7 ++
 .../statefun-smoke-e2e-common/pom.xml              |   7 ++
 .../statefun-smoke-e2e-multilang-harness/pom.xml   |   2 -
 statefun-flink/statefun-flink-common/pom.xml       |   4 -
 .../flink/common/json/StateFunObjectMapper.java    |  16 +++-
 statefun-flink/statefun-flink-core/pom.xml         |   4 -
 .../DefaultHttpRequestReplyClientFactory.java      |   3 +-
 .../httpfn/DefaultHttpRequestReplyClientSpec.java  |  13 +++
 .../DefaultHttpRequestReplyClientSpecTest.java     | 105 +++++++++++++++++++++
 statefun-flink/statefun-flink-datastream/pom.xml   |  12 ++-
 .../datastream/RequestReplyFunctionBuilder.java    |   6 +-
 .../RequestReplyFunctionBuilderTest.java}          |  27 +++---
 statefun-flink/statefun-flink-io-bundle/pom.xml    |   4 -
 .../statefun-flink-state-processor/pom.xml         |   4 -
 statefun-kafka-io/pom.xml                          |   4 -
 statefun-kinesis-io/pom.xml                        |   2 -
 statefun-sdk-embedded/pom.xml                      |   4 -
 statefun-sdk-java/pom.xml                          |   4 -
 statefun-testutil/pom.xml                          |   3 -
 20 files changed, 191 insertions(+), 57 deletions(-)
 create mode 100644 statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
 copy statefun-flink/{statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/UrlPathTemplate.java => statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java} (59%)

[flink-statefun] 02/02: [hotfix] Move junit and hamcrest-all test dependency to dependencyManagement

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit fe3c266fa9f2105efd090f44e36bb4eb4d179ab3
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Mon Dec 27 15:36:52 2021 +0100

    [hotfix] Move junit and hamcrest-all test dependency to dependencyManagement
    
    This commit sets the junit and hamcrest-all test dependency version and scope in the
    dependencyManagement section of the root pom to unify all dependencies. This has the
    purpose to avoid the usage of different testing libraries.
---
 pom.xml                                                 | 17 +++++++++++++++++
 statefun-e2e-tests/statefun-e2e-tests-common/pom.xml    |  7 +++++++
 statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml    |  7 +++++++
 .../statefun-smoke-e2e-multilang-harness/pom.xml        |  2 --
 statefun-flink/statefun-flink-common/pom.xml            |  4 ----
 statefun-flink/statefun-flink-core/pom.xml              |  4 ----
 statefun-flink/statefun-flink-datastream/pom.xml        |  4 ----
 statefun-flink/statefun-flink-io-bundle/pom.xml         |  4 ----
 statefun-flink/statefun-flink-state-processor/pom.xml   |  4 ----
 statefun-kafka-io/pom.xml                               |  4 ----
 statefun-kinesis-io/pom.xml                             |  2 --
 statefun-sdk-embedded/pom.xml                           |  4 ----
 statefun-sdk-java/pom.xml                               |  4 ----
 statefun-testutil/pom.xml                               |  3 ---
 14 files changed, 31 insertions(+), 39 deletions(-)

diff --git a/pom.xml b/pom.xml
index 7f74819..c221c1c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,6 +92,23 @@ under the License.
         </dependency>
     </dependencies>
 
+    <dependencyManagement>
+        <dependencies>
+            <dependency>
+                <groupId>junit</groupId>
+                <artifactId>junit</artifactId>
+                <version>4.12</version>
+                <scope>test</scope>
+            </dependency>
+            <dependency>
+                <groupId>org.hamcrest</groupId>
+                <artifactId>hamcrest-all</artifactId>
+                <version>1.3</version>
+                <scope>test</scope>
+            </dependency>
+        </dependencies>
+    </dependencyManagement>
+
 
     <profiles>
         <!--
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
index a203e0b..ac3abcb 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/pom.xml
@@ -113,5 +113,12 @@ under the License.
                 </exclusion>
             </exclusions>
         </dependency>
+
+        <!-- Junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
index d60d6d2..6e7c54d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
@@ -46,6 +46,13 @@ under the License.
             </exclusions>
         </dependency>
 
+        <!-- Junit -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <scope>compile</scope>
+        </dependency>
+
         <!-- Protobuf Commands messages -->
         <dependency>
             <groupId>com.google.protobuf</groupId>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
index 6e69f73..cc7b683 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
@@ -59,8 +59,6 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/statefun-flink/statefun-flink-common/pom.xml b/statefun-flink/statefun-flink-common/pom.xml
index 1e1a576..28655d6 100644
--- a/statefun-flink/statefun-flink-common/pom.xml
+++ b/statefun-flink/statefun-flink-common/pom.xml
@@ -62,14 +62,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/statefun-flink/statefun-flink-core/pom.xml b/statefun-flink/statefun-flink-core/pom.xml
index 790240c..5c79728 100644
--- a/statefun-flink/statefun-flink-core/pom.xml
+++ b/statefun-flink/statefun-flink-core/pom.xml
@@ -102,14 +102,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml b/statefun-flink/statefun-flink-datastream/pom.xml
index 2679286..ed93b56 100644
--- a/statefun-flink/statefun-flink-datastream/pom.xml
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -82,14 +82,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
         
     </dependencies>
diff --git a/statefun-flink/statefun-flink-io-bundle/pom.xml b/statefun-flink/statefun-flink-io-bundle/pom.xml
index 46270b4..a6b96a9 100644
--- a/statefun-flink/statefun-flink-io-bundle/pom.xml
+++ b/statefun-flink/statefun-flink-io-bundle/pom.xml
@@ -103,14 +103,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/statefun-flink/statefun-flink-state-processor/pom.xml b/statefun-flink/statefun-flink-state-processor/pom.xml
index 964ba02..efc0eca 100644
--- a/statefun-flink/statefun-flink-state-processor/pom.xml
+++ b/statefun-flink/statefun-flink-state-processor/pom.xml
@@ -63,8 +63,6 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
 
         <dependency>
@@ -90,8 +88,6 @@ under the License.
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/statefun-kafka-io/pom.xml b/statefun-kafka-io/pom.xml
index 65183cb..9ea9176 100644
--- a/statefun-kafka-io/pom.xml
+++ b/statefun-kafka-io/pom.xml
@@ -57,14 +57,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
diff --git a/statefun-kinesis-io/pom.xml b/statefun-kinesis-io/pom.xml
index eeeec34..4bfbf35 100644
--- a/statefun-kinesis-io/pom.xml
+++ b/statefun-kinesis-io/pom.xml
@@ -39,8 +39,6 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/statefun-sdk-embedded/pom.xml b/statefun-sdk-embedded/pom.xml
index 12fc603..90a903c 100644
--- a/statefun-sdk-embedded/pom.xml
+++ b/statefun-sdk-embedded/pom.xml
@@ -42,14 +42,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/statefun-sdk-java/pom.xml b/statefun-sdk-java/pom.xml
index a9754b5..eaf1931 100644
--- a/statefun-sdk-java/pom.xml
+++ b/statefun-sdk-java/pom.xml
@@ -39,14 +39,10 @@ under the License.
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/statefun-testutil/pom.xml b/statefun-testutil/pom.xml
index c4ff0ee..9207363 100644
--- a/statefun-testutil/pom.xml
+++ b/statefun-testutil/pom.xml
@@ -36,14 +36,11 @@ under the License.
         <dependency>
             <groupId>org.hamcrest</groupId>
             <artifactId>hamcrest-all</artifactId>
-            <version>1.3</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.12</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 

[flink-statefun] 01/02: [FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder

Posted by tr...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-statefun.git

commit 51d3130a172a40ed1dfda9269292b08054534c64
Author: Galen Warren <78...@users.noreply.github.com>
AuthorDate: Sun Dec 26 11:24:23 2021 -0500

    [FLINK-25197] Fix serialization issue in RequestReplyFunctionBuilder
    
    This closes #282.
---
 .../flink/common/json/StateFunObjectMapper.java    |  16 +++-
 .../DefaultHttpRequestReplyClientFactory.java      |   3 +-
 .../httpfn/DefaultHttpRequestReplyClientSpec.java  |  13 +++
 .../DefaultHttpRequestReplyClientSpecTest.java     | 105 +++++++++++++++++++++
 statefun-flink/statefun-flink-datastream/pom.xml   |  16 +++-
 .../datastream/RequestReplyFunctionBuilder.java    |   6 +-
 .../RequestReplyFunctionBuilderTest.java           |  39 ++++++++
 7 files changed, 190 insertions(+), 8 deletions(-)

diff --git a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
index 2d3e502..49b7289 100644
--- a/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
+++ b/statefun-flink/statefun-flink-common/src/main/java/org/apache/flink/statefun/flink/common/json/StateFunObjectMapper.java
@@ -20,11 +20,9 @@ package org.apache.flink.statefun.flink.common.json;
 
 import java.io.IOException;
 import java.time.Duration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationContext;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.DeserializationFeature;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonDeserializer;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.*;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.module.SimpleModule;
 import org.apache.flink.statefun.sdk.TypeName;
 import org.apache.flink.util.TimeUtils;
@@ -36,6 +34,7 @@ public final class StateFunObjectMapper {
         new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
 
     final SimpleModule module = new SimpleModule("statefun");
+    module.addSerializer(Duration.class, new DurationJsonSerializer());
     module.addDeserializer(Duration.class, new DurationJsonDeserializer());
     module.addDeserializer(TypeName.class, new TypeNameJsonDeserializer());
 
@@ -51,6 +50,15 @@ public final class StateFunObjectMapper {
     }
   }
 
+  private static final class DurationJsonSerializer extends JsonSerializer<Duration> {
+    @Override
+    public void serialize(
+        Duration duration, JsonGenerator jsonGenerator, SerializerProvider serializerProvider)
+        throws IOException {
+      jsonGenerator.writeString(TimeUtils.formatWithHighestUnit(duration));
+    }
+  }
+
   private static final class TypeNameJsonDeserializer extends JsonDeserializer<TypeName> {
     @Override
     public TypeName deserialize(
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
index 5387c65..fafbfbe 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientFactory.java
@@ -108,8 +108,7 @@ public final class DefaultHttpRequestReplyClientFactory implements RequestReplyC
   private static DefaultHttpRequestReplyClientSpec parseTransportProperties(
       ObjectNode transportClientProperties) {
     try {
-      return OBJ_MAPPER.treeToValue(
-          transportClientProperties, DefaultHttpRequestReplyClientSpec.class);
+      return DefaultHttpRequestReplyClientSpec.fromJson(OBJ_MAPPER, transportClientProperties);
     } catch (Exception e) {
       throw new RuntimeException(
           "Unable to parse transport client properties when creating client: ", e);
diff --git a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
index 5aa3785..d01332f 100644
--- a/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
+++ b/statefun-flink/statefun-flink-core/src/main/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpec.java
@@ -22,6 +22,10 @@ import java.time.Duration;
 import java.util.Objects;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonSetter;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
 
 public final class DefaultHttpRequestReplyClientSpec {
 
@@ -39,6 +43,15 @@ public final class DefaultHttpRequestReplyClientSpec {
     return timeouts;
   }
 
+  public ObjectNode toJson(ObjectMapper objectMapper) {
+    return objectMapper.valueToTree(this);
+  }
+
+  static DefaultHttpRequestReplyClientSpec fromJson(ObjectMapper objectMapper, JsonNode jsonNode)
+      throws JsonProcessingException {
+    return objectMapper.treeToValue(jsonNode, DefaultHttpRequestReplyClientSpec.class);
+  }
+
   private static void validateTimeouts(
       Duration callTimeout, Duration connectTimeout, Duration readTimeout, Duration writeTimeout) {
 
diff --git a/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
new file mode 100644
index 0000000..6120a27
--- /dev/null
+++ b/statefun-flink/statefun-flink-core/src/test/java/org/apache/flink/statefun/flink/core/httpfn/DefaultHttpRequestReplyClientSpecTest.java
@@ -0,0 +1,105 @@
+package org.apache.flink.statefun.flink.core.httpfn;
+
+import static org.junit.Assert.*;
+
+import java.time.Duration;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.Test;
+
+public class DefaultHttpRequestReplyClientSpecTest {
+
+  @Test
+  public void jsonSerDe() throws JsonProcessingException {
+    final Duration callTimeout = Duration.ofDays(1L);
+    final Duration connectTimeout = Duration.ofNanos(2L);
+    final Duration readTimeout = Duration.ofSeconds(3L);
+    final Duration writeTimeout = Duration.ofMillis(4L);
+
+    final DefaultHttpRequestReplyClientSpec.Timeouts timeouts =
+        new DefaultHttpRequestReplyClientSpec.Timeouts();
+    timeouts.setCallTimeout(callTimeout);
+    timeouts.setConnectTimeout(connectTimeout);
+    timeouts.setReadTimeout(readTimeout);
+    timeouts.setWriteTimeout(writeTimeout);
+
+    final DefaultHttpRequestReplyClientSpec defaultHttpRequestReplyClientSpec =
+        new DefaultHttpRequestReplyClientSpec();
+    defaultHttpRequestReplyClientSpec.setTimeouts(timeouts);
+
+    final ObjectMapper objectMapper = StateFunObjectMapper.create();
+    final ObjectNode json = defaultHttpRequestReplyClientSpec.toJson(objectMapper);
+
+    final DefaultHttpRequestReplyClientSpec deserializedHttpRequestReplyClientSpec =
+        DefaultHttpRequestReplyClientSpec.fromJson(objectMapper, json);
+
+    assertThat(deserializedHttpRequestReplyClientSpec.getTimeouts(), equalTimeouts(timeouts));
+  }
+
+  private static TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts>
+      equalTimeouts(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) {
+    return new TimeoutsEqualityMatcher(timeouts);
+  }
+
+  private static class TimeoutsEqualityMatcher
+      extends TypeSafeDiagnosingMatcher<DefaultHttpRequestReplyClientSpec.Timeouts> {
+    private final DefaultHttpRequestReplyClientSpec.Timeouts expected;
+
+    private TimeoutsEqualityMatcher(DefaultHttpRequestReplyClientSpec.Timeouts timeouts) {
+      this.expected = timeouts;
+    }
+
+    @Override
+    protected boolean matchesSafely(
+        DefaultHttpRequestReplyClientSpec.Timeouts timeouts, Description description) {
+      boolean matching = true;
+
+      if (!timeouts.getCallTimeout().equals(expected.getCallTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getCallTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getCallTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getReadTimeout().equals(expected.getReadTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getReadTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getReadTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getWriteTimeout().equals(expected.getWriteTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getWriteTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getWriteTimeout());
+        matching = false;
+      }
+
+      if (!timeouts.getConnectTimeout().equals(expected.getConnectTimeout())) {
+        description
+            .appendText("expected ")
+            .appendValue(expected.getConnectTimeout())
+            .appendText(" found ")
+            .appendValue(timeouts.getConnectTimeout());
+        matching = false;
+      }
+
+      return matching;
+    }
+
+    @Override
+    public void describeTo(Description description) {
+      description.appendText("Matches equality of Timeouts");
+    }
+  }
+}
diff --git a/statefun-flink/statefun-flink-datastream/pom.xml b/statefun-flink/statefun-flink-datastream/pom.xml
index 8f5138c..2679286 100644
--- a/statefun-flink/statefun-flink-datastream/pom.xml
+++ b/statefun-flink/statefun-flink-datastream/pom.xml
@@ -53,7 +53,7 @@ under the License.
         </dependency>
   
         <!-- The following dependencies are here with scope provided, because: 
-             a) they are transitively required by the statefun-flink-* depencies
+             a) they are transitively required by the statefun-flink-* dependencies
              b) they are provided at runtime, by the embedding application. 
              
              Also note that org.slf4j:slf4j-api is excluded from all the artifacts, since maven 
@@ -77,6 +77,20 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
        </dependency>
+
+        <!-- Tests -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.12</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
         
     </dependencies>
 
diff --git a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
index 9f479eb..2875c06 100644
--- a/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
+++ b/statefun-flink/statefun-flink-datastream/src/main/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilder.java
@@ -23,6 +23,7 @@ import java.time.Duration;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
+import org.apache.flink.statefun.flink.common.json.StateFunObjectMapper;
 import org.apache.flink.statefun.flink.core.httpfn.DefaultHttpRequestReplyClientSpec;
 import org.apache.flink.statefun.flink.core.httpfn.HttpFunctionEndpointSpec;
 import org.apache.flink.statefun.flink.core.httpfn.TargetFunctions;
@@ -34,6 +35,9 @@ import org.apache.flink.statefun.sdk.FunctionType;
 /** A Builder for RequestReply remote function type. */
 public class RequestReplyFunctionBuilder {
 
+  /** The object mapper used to serialize the client spec object. */
+  private static final ObjectMapper CLIENT_SPEC_OBJ_MAPPER = StateFunObjectMapper.create();
+
   private final DefaultHttpRequestReplyClientSpec.Timeouts transportClientTimeoutsSpec =
       new DefaultHttpRequestReplyClientSpec.Timeouts();
 
@@ -130,6 +134,6 @@ public class RequestReplyFunctionBuilder {
         new DefaultHttpRequestReplyClientSpec();
     transportClientSpecPojo.setTimeouts(transportClientTimeoutsSpec);
 
-    return new ObjectMapper().valueToTree(transportClientSpecPojo);
+    return transportClientSpecPojo.toJson(CLIENT_SPEC_OBJ_MAPPER);
   }
 }
diff --git a/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
new file mode 100644
index 0000000..63a9276
--- /dev/null
+++ b/statefun-flink/statefun-flink-datastream/src/test/java/org/apache/flink/statefun/flink/datastream/RequestReplyFunctionBuilderTest.java
@@ -0,0 +1,39 @@
+package org.apache.flink.statefun.flink.datastream;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.notNullValue;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.junit.Test;
+
+public class RequestReplyFunctionBuilderTest {
+
+  @Test
+  public void clientSpecCanBeCreated() throws URISyntaxException {
+    final RequestReplyFunctionBuilder requestReplyFunctionBuilder =
+        RequestReplyFunctionBuilder.requestReplyFunctionBuilder(
+            new FunctionType("foobar", "barfoo"), new URI("foobar"));
+
+    assertThat(requestReplyFunctionBuilder.spec(), notNullValue());
+  }
+}