You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ja...@apache.org on 2017/09/28 00:08:08 UTC
[1/4] samza git commit: SAMZA-1109;
Allow setting Serdes for fluent API operators in code
Repository: samza
Updated Branches:
refs/heads/master 987180a17 -> f16ba2692
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
index 40e5e30..2a8f039 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/PageView.java
@@ -18,32 +18,33 @@
*/
package org.apache.samza.test.operator;
+
class PageView {
- private final String userId;
- private final String country;
- private final String url;
-
- /**
- * Constructs a {@link PageView} from the provided string.
- *
- * @param message in the following CSV format - userId,country,url
- */
- PageView(String message) {
- String[] pageViewFields = message.split(",");
- userId = pageViewFields[0];
- country = pageViewFields[1];
- url = pageViewFields[2];
- }
+ private String userId;
+ private String country;
+ private String url;
- String getUserId() {
+ public String getUserId() {
return userId;
}
- String getCountry() {
+ public String getCountry() {
return country;
}
- String getUrl() {
+ public String getUrl() {
return url;
}
+
+ public void setUserId(String userId) {
+ this.userId = userId;
+ }
+
+ public void setCountry(String country) {
+ this.country = country;
+ }
+
+ public void setUrl(String url) {
+ this.url = url;
+ }
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
index 1e2acb2..261b954 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/RepartitionWindowApp.java
@@ -21,37 +21,35 @@ package org.apache.samza.test.operator;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import java.time.Duration;
-import java.util.Collection;
-import java.util.function.Function;
/**
- * A {@link StreamApplication} that demonstrates a repartition followed by a windowed count.
+ * A {@link StreamApplication} that demonstrates a partitionBy followed by a windowed count.
*/
public class RepartitionWindowApp implements StreamApplication {
-
- private static final Logger LOG = LoggerFactory.getLogger(RepartitionWindowApp.class);
+ static final String INPUT_TOPIC = "page-views";
+ static final String OUTPUT_TOPIC = "page-view-counts";
@Override
public void init(StreamGraph graph, Config config) {
+ MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
- MessageStream<String> pageViews = graph.<String, String, String>getInputStream("page-views", (k, v) -> v);
- Function<String, String> keyFn = pageView -> new PageView(pageView).getUserId();
-
- OutputStream<String, String, WindowPane<String, Collection<String>>> outputStream = graph
- .getOutputStream(TestRepartitionWindowApp.OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+ OutputStream<KV<String, String>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new StringSerde()));
pageViews
- .partitionBy(keyFn)
- .window(Windows.keyedSessionWindow(keyFn, Duration.ofSeconds(3)))
+ .partitionBy(PageView::getUserId, pv -> pv, new KVSerde<>(new StringSerde(), new JsonSerdeV2<>(PageView.class)))
+ .window(Windows.keyedSessionWindow(KV::getKey, Duration.ofSeconds(3)))
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), String.valueOf(windowPane.getMessage().size())))
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
index 65b48d3..4c83960 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/SessionWindowApp.java
@@ -21,35 +21,36 @@ package org.apache.samza.test.operator;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import java.time.Duration;
-import java.util.Collection;
/**
* A {@link StreamApplication} that demonstrates a filter followed by a session window.
*/
public class SessionWindowApp implements StreamApplication {
-
- private static final Logger LOG = LoggerFactory.getLogger(SessionWindowApp.class);
+ private static final String INPUT_TOPIC = "page-views";
+ private static final String OUTPUT_TOPIC = "page-view-counts";
private static final String FILTER_KEY = "badKey";
- private static final String OUTPUT_TOPIC = "Result";
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<PageView> pageViews = graph.<String, String, PageView>getInputStream("page-views", (k, v) -> new PageView(v));
- OutputStream<String, String, WindowPane<String, Collection<PageView>>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+ MessageStream<PageView> pageViews = graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
+ OutputStream<KV<String, Integer>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
- .window(Windows.keyedSessionWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3)))
+ .window(Windows.keyedSessionWindow(PageView::getUserId, Duration.ofSeconds(3)))
+ .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
index 57522eb..3745541 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TestRepartitionWindowApp.java
@@ -25,13 +25,13 @@ import org.junit.Test;
import java.util.Collections;
import java.util.List;
+import static org.apache.samza.test.operator.RepartitionWindowApp.INPUT_TOPIC;
+import static org.apache.samza.test.operator.RepartitionWindowApp.OUTPUT_TOPIC;
+
/**
* Test driver for {@link RepartitionWindowApp}.
*/
public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHarness {
-
- static final String INPUT_TOPIC = "page-views";
- static final String OUTPUT_TOPIC = "Result";
private static final String APP_NAME = "RepartitionedSessionizer";
@Test
@@ -41,11 +41,11 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
createTopic(OUTPUT_TOPIC, 1);
// produce messages to different partitions.
- produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,5.com");
- produceMessage(INPUT_TOPIC, 1, "userId2", "userId2,china,4.com");
- produceMessage(INPUT_TOPIC, 2, "userId1", "userId1,india,1.com");
- produceMessage(INPUT_TOPIC, 0, "userId1", "userId1,india,2.com");
- produceMessage(INPUT_TOPIC, 1, "userId1", "userId1,india,3.com");
+ produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"5.com\"}");
+ produceMessage(INPUT_TOPIC, 1, "userId2", "{\"userId\":\"userId2\", \"country\":\"china\",\"url\":\"4.com\"}");
+ produceMessage(INPUT_TOPIC, 2, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"1.com\"}");
+ produceMessage(INPUT_TOPIC, 0, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"2.com\"}");
+ produceMessage(INPUT_TOPIC, 1, "userId1", "{\"userId\":\"userId1\", \"country\":\"india\",\"url\":\"3.com\"}");
// run the application
RepartitionWindowApp app = new RepartitionWindowApp();
@@ -61,9 +61,9 @@ public class TestRepartitionWindowApp extends StreamApplicationIntegrationTestHa
// Assert that there are 4 messages for userId1 and 1 message for userId2.
Assert.assertTrue(key.equals("userId1") || key.equals("userId2"));
if ("userId1".equals(key)) {
- Assert.assertEquals(value, "4");
+ Assert.assertEquals("4", value);
} else {
- Assert.assertEquals(value, "1");
+ Assert.assertEquals("1", value);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
index ae2608d..3f3e615 100644
--- a/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
+++ b/samza-test/src/test/java/org/apache/samza/test/operator/TumblingWindowApp.java
@@ -21,35 +21,37 @@ package org.apache.samza.test.operator;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
-import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import java.time.Duration;
-import java.util.Collection;
/**
* A {@link StreamApplication} that demonstrates a filter followed by a tumbling window.
*/
public class TumblingWindowApp implements StreamApplication {
-
- private static final Logger LOG = LoggerFactory.getLogger(TumblingWindowApp.class);
+ private static final String INPUT_TOPIC = "page-views";
+ private static final String OUTPUT_TOPIC = "page-view-counts";
private static final String FILTER_KEY = "badKey";
- private static final String OUTPUT_TOPIC = "Result";
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<PageView> pageViews = graph.<String, String, PageView>getInputStream("page-views", (k, v) -> new PageView(v));
- OutputStream<String, String, WindowPane<String, Collection<PageView>>> outputStream = graph
- .getOutputStream(OUTPUT_TOPIC, m -> m.getKey().getKey(), m -> new Integer(m.getMessage().size()).toString());
+ MessageStream<PageView> pageViews =
+ graph.getInputStream(INPUT_TOPIC, new JsonSerdeV2<>(PageView.class));
+ OutputStream<KV<String, Integer>> outputStream =
+ graph.getOutputStream(OUTPUT_TOPIC, new KVSerde<>(new StringSerde(), new IntegerSerde()));
pageViews
.filter(m -> !FILTER_KEY.equals(m.getUserId()))
- .window(Windows.keyedTumblingWindow(pageView -> pageView.getUserId(), Duration.ofSeconds(3)))
+ .window(Windows.keyedTumblingWindow(PageView::getUserId, Duration.ofSeconds(3)))
+ .map(m -> KV.of(m.getKey().getKey(), m.getMessage().size()))
.sendTo(outputStream);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
----------------------------------------------------------------------
diff --git a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
index fb8f17a..c550a3b 100644
--- a/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
+++ b/samza-test/src/test/java/org/apache/samza/test/processor/TestZkLocalApplicationRunner.java
@@ -23,7 +23,6 @@ import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import java.util.ArrayList;
import kafka.admin.AdminUtils;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
@@ -47,6 +46,8 @@ import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.test.StandaloneIntegrationTestHarness;
import org.apache.samza.test.StandaloneTestUtils;
import org.apache.samza.util.NoOpMetricsRegistry;
@@ -62,6 +63,7 @@ import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -523,21 +525,23 @@ public class TestZkLocalApplicationRunner extends StandaloneIntegrationTestHarne
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<String> inputStream = graph.getInputStream(inputTopic, (key, msg) -> {
- TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
- if (streamApplicationCallback != null) {
- streamApplicationCallback.onMessageReceived(incomingMessage);
- }
- if (processedMessagesLatch != null) {
- processedMessagesLatch.countDown();
- }
- if (kafkaEventsConsumedLatch != null) {
- kafkaEventsConsumedLatch.countDown();
- }
- return incomingMessage.toString();
- });
- OutputStream<String, String, String> outputStream = graph.getOutputStream(outputTopic, event -> null, event -> event);
- inputStream.sendTo(outputStream);
+ MessageStream<String> inputStream = graph.getInputStream(inputTopic, new NoOpSerde<String>());
+ OutputStream<String> outputStream = graph.getOutputStream(outputTopic, new StringSerde());
+ inputStream
+ .map(msg -> {
+ TestKafkaEvent incomingMessage = TestKafkaEvent.fromString((String) msg);
+ if (streamApplicationCallback != null) {
+ streamApplicationCallback.onMessageReceived(incomingMessage);
+ }
+ if (processedMessagesLatch != null) {
+ processedMessagesLatch.countDown();
+ }
+ if (kafkaEventsConsumedLatch != null) {
+ kafkaEventsConsumedLatch.countDown();
+ }
+ return incomingMessage.toString();
+ })
+ .sendTo(outputStream);
}
}
}
[4/4] samza git commit: SAMZA-1109;
Allow setting Serdes for fluent API operators in code
Posted by ja...@apache.org.
SAMZA-1109; Allow setting Serdes for fluent API operators in code
Author: Prateek Maheshwari <pm...@linkedin.com>
Reviewers: Jagadish <ja...@apache.org>, Jacob Maes <jm...@apache.org>
Closes #293 from prateekm/serde-instance
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/f16ba269
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/f16ba269
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/f16ba269
Branch: refs/heads/master
Commit: f16ba2692ef85a624ce5f9050ddb6b9bce3dbf6c
Parents: 987180a
Author: Prateek Maheshwari <pm...@linkedin.com>
Authored: Wed Sep 27 17:08:04 2017 -0700
Committer: Jagadish <ja...@apache.org>
Committed: Wed Sep 27 17:08:04 2017 -0700
----------------------------------------------------------------------
build.gradle | 13 +-
.../versioned/jobs/configuration-table.html | 9 +-
.../java/org/apache/samza/operators/KV.java | 48 +++
.../apache/samza/operators/MessageStream.java | 34 +-
.../apache/samza/operators/OutputStream.java | 6 +-
.../org/apache/samza/operators/StreamGraph.java | 67 ++-
.../org/apache/samza/serializers/Serde.java | 6 +-
.../ByteBufferSerde.scala | 48 +++
.../ByteSerde.scala | 36 ++
.../DoubleSerde.scala | 45 ++
.../IntegerSerde.scala | 45 ++
.../JsonSerdeV2.scala | 91 +++++
.../org.apache.samza.serializers/KVSerde.scala | 55 +++
.../LongSerde.scala | 45 ++
.../NoOpSerde.scala | 37 ++
.../SerializableSerde.scala | 67 +++
.../StringSerde.scala | 49 +++
.../UUIDSerde.scala | 47 +++
.../TestByteBufferSerde.scala | 53 +++
.../TestByteSerde.scala | 38 ++
.../TestDoubleSerde.scala | 40 ++
.../TestIntegerSerde.scala | 37 ++
.../TestJsonSerdeV2.scala | 45 ++
.../TestLongSerde.scala | 40 ++
.../TestSerializableSerde.scala | 45 ++
.../TestStringSerde.scala | 37 ++
.../TestUUIDSerde.scala | 53 +++
.../org/apache/samza/execution/JobNode.java | 90 +++-
.../samza/operators/MessageStreamImpl.java | 25 +-
.../apache/samza/operators/StreamGraphImpl.java | 127 +++---
.../samza/operators/impl/InputOperatorImpl.java | 16 +-
.../samza/operators/impl/OperatorImpl.java | 18 +-
.../samza/operators/impl/OperatorImplGraph.java | 19 +-
.../operators/impl/OutputOperatorImpl.java | 22 +-
.../operators/impl/PartitionByOperatorImpl.java | 82 ++++
.../operators/impl/WindowOperatorImpl.java | 3 +-
.../samza/operators/spec/InputOperatorSpec.java | 35 +-
.../samza/operators/spec/OperatorSpec.java | 20 +-
.../samza/operators/spec/OperatorSpecs.java | 26 +-
.../operators/spec/OutputOperatorSpec.java | 11 +-
.../samza/operators/spec/OutputStreamImpl.java | 28 +-
.../operators/spec/PartitionByOperatorSpec.java | 76 ++++
.../stream/IntermediateMessageStreamImpl.java | 17 +-
.../serializers/IntermediateMessageSerde.java | 39 +-
.../apache/samza/task/StreamOperatorTask.java | 4 +-
.../apache/samza/config/SerializerConfig.scala | 6 +-
.../apache/samza/container/SamzaContainer.scala | 55 ++-
.../samza/serializers/ByteBufferSerde.scala | 48 ---
.../apache/samza/serializers/ByteSerde.scala | 36 --
.../apache/samza/serializers/DoubleSerde.scala | 45 --
.../apache/samza/serializers/IntegerSerde.scala | 45 --
.../apache/samza/serializers/JsonSerde.scala | 32 +-
.../apache/samza/serializers/LongSerde.scala | 45 --
.../serializers/MetricsSnapshotSerde.scala | 4 +-
.../samza/serializers/SerializableSerde.scala | 67 ---
.../apache/samza/serializers/StringSerde.scala | 44 --
.../apache/samza/serializers/UUIDSerde.scala | 47 ---
.../apache/samza/example/BroadcastExample.java | 18 +-
.../samza/example/KeyValueStoreExample.java | 26 +-
.../org/apache/samza/example/MergeExample.java | 20 +-
.../samza/example/OrderShipmentJoinExample.java | 28 +-
.../samza/example/PageViewCounterExample.java | 13 +-
.../samza/example/RepartitionExample.java | 22 +-
.../org/apache/samza/example/WindowExample.java | 8 +-
.../samza/execution/TestExecutionPlanner.java | 59 +--
.../execution/TestJobGraphJsonGenerator.java | 35 +-
.../org/apache/samza/execution/TestJobNode.java | 111 +++++
.../samza/operators/TestJoinOperator.java | 21 +-
.../samza/operators/TestMessageStreamImpl.java | 69 +++-
.../samza/operators/TestStreamGraphImpl.java | 407 +++++++++++++++++--
.../samza/operators/TestWindowOperator.java | 17 +-
.../operators/impl/TestOperatorImplGraph.java | 66 ++-
.../operators/impl/TestStreamOperatorImpl.java | 4 +-
.../samza/serializers/TestByteBufferSerde.scala | 53 ---
.../samza/serializers/TestByteSerde.scala | 38 --
.../samza/serializers/TestDoubleSerde.scala | 40 --
.../samza/serializers/TestIntegerSerde.scala | 37 --
.../samza/serializers/TestLongSerde.scala | 40 --
.../serializers/TestSerializableSerde.scala | 45 --
.../samza/serializers/TestStringSerde.scala | 37 --
.../samza/serializers/TestUUIDSerde.scala | 53 ---
.../apache/samza/config/Log4jSystemConfig.java | 2 +-
.../samza/logging/log4j/StreamAppender.java | 2 +-
samza-test/src/main/resources/log4j.xml | 44 +-
.../apache/samza/test/operator/PageView.java | 37 +-
.../test/operator/RepartitionWindowApp.java | 28 +-
.../samza/test/operator/SessionWindowApp.java | 23 +-
.../test/operator/TestRepartitionWindowApp.java | 20 +-
.../samza/test/operator/TumblingWindowApp.java | 24 +-
.../processor/TestZkLocalApplicationRunner.java | 36 +-
90 files changed, 2529 insertions(+), 1222 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 319e51f..16091ae 100644
--- a/build.gradle
+++ b/build.gradle
@@ -120,11 +120,22 @@ subprojects {
}
project(':samza-api') {
- apply plugin: 'java'
+ apply plugin: 'scala'
apply plugin: 'checkstyle'
+ // Force scala joint compilation
+ sourceSets.main.scala.srcDir "src/main/java"
+ sourceSets.test.scala.srcDir "src/test/java"
+
+ // Disable the Javac compiler by forcing joint compilation by scalac. This is equivalent to setting
+ // tasks.compileTestJava.enabled = false
+ sourceSets.main.java.srcDirs = []
+ sourceSets.test.java.srcDirs = []
+
dependencies {
compile "org.slf4j:slf4j-api:$slf4jVersion"
+ compile "org.codehaus.jackson:jackson-mapper-asl:$jacksonVersion"
+ compile "org.scala-lang:scala-library:$scalaLibVersion"
testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index d736d6b..b4a1d56 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -1138,7 +1138,14 @@
<dt><code>org.apache.samza.serializers.StringSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.String</code> objects as UTF-8.</dd>
<dt><code>org.apache.samza.serializers.JsonSerdeFactory</code></dt>
- <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.</dd>
+ <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
+ Note: This Serde enforces a dash-separated property naming convention, while JsonSerdeV2 doesn't.
+ This serde is primarily meant for Samza's internal usage, and is publicly available for backwards compatibility.</dd>
+ <dt><code>org.apache.samza.serializers.JsonSerdeV2Factory</code></dt>
+ <dd>Encodes nested structures of <code>java.util.Map</code>, <code>java.util.List</code> etc. as JSON.<br/>
+ Note: This Serde uses Jackson's default (camelCase) property naming convention. This serde should be <br/>
+ preferred over JsonSerde, especially in High Level API, unless the dasherized naming convention is required <br/>
+ (e.g., for backwards compatibility).</dd>
<dt><code>org.apache.samza.serializers.LongSerdeFactory</code></dt>
<dd>Encodes <code>java.lang.Long</code> as binary (8 bytes fixed-length big-endian encoding).</dd>
<dt><code>org.apache.samza.serializers.DoubleSerdeFactory</code></dt>
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/KV.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/KV.java b/samza-api/src/main/java/org/apache/samza/operators/KV.java
new file mode 100644
index 0000000..0bed3b9
--- /dev/null
+++ b/samza-api/src/main/java/org/apache/samza/operators/KV.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators;
+
+
+/**
+ * A key and value pair.
+ *
+ * @param <K> type of the key
+ * @param <V> type of the value
+ */
+public class KV<K, V> {
+ public final K key;
+ public final V value;
+
+ public static <K, V> KV<K, V> of(K key, V value) {
+ return new KV<>(key, value);
+ }
+
+ public KV(K key, V value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public K getKey() {
+ return key;
+ }
+
+ public V getValue() {
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
index bef1d3f..2a1045d 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/MessageStream.java
@@ -26,6 +26,7 @@ import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.functions.SinkFunction;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
+import org.apache.samza.serializers.KVSerde;
import java.time.Duration;
import java.util.ArrayList;
@@ -67,7 +68,7 @@ public interface MessageStream<M> {
* Applies the provided function to messages in this {@link MessageStream} and returns the
* filtered {@link MessageStream}.
* <p>
- * The {@link Function} is a predicate which determines whether a message in this {@link MessageStream}
+ * The {@link FilterFunction} is a predicate which determines whether a message in this {@link MessageStream}
* should be retained in the filtered {@link MessageStream}.
*
* @param filterFn the predicate to filter messages from this {@link MessageStream}.
@@ -93,10 +94,8 @@ public interface MessageStream<M> {
* Allows sending messages in this {@link MessageStream} to an {@link OutputStream}.
*
* @param outputStream the output stream to send messages to
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
*/
- <K, V> void sendTo(OutputStream<K, V, M> outputStream);
+ void sendTo(OutputStream<M> outputStream);
/**
* Groups the messages in this {@link MessageStream} according to the provided {@link Window} semantics
@@ -171,8 +170,9 @@ public interface MessageStream<M> {
* intermediate stream on the {@code job.default.system}. This intermediate stream is both an output and
* input to the job.
* <p>
- * The key and message Serdes configured for the default system must be able to serialize and deserialize
- * types K and M respectively.
+ * Uses the provided {@link KVSerde} for serialization of keys and values. If the provided {@code serde} is null,
+ * uses the default serde provided via {@link StreamGraph#setDefaultSerde}, which must be a KVSerde.
+ * If no default serde has been provided <b>before</b> calling this method, no-op serdes are used for keys and values.
* <p>
* The number of partitions for this intermediate stream is determined as follows:
* If the stream is an eventual input to a {@link #join}, and the number of partitions for the other stream is known,
@@ -182,11 +182,25 @@ public interface MessageStream<M> {
* Else, the number of partitions is set to to the max of number of partitions for all input and output streams
* (excluding intermediate streams).
*
- * @param keyExtractor the {@link Function} to extract the output message key and partition key from
- * the input message
- * @param <K> the type of output message key and partition key
+ * @param <K> the type of output key
+ * @param <V> the type of output value
+ * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
+ * @param valueExtractor the {@link Function} to extract the value from the input message
+ * @param serde the {@link KVSerde} to use for (de)serializing the key and value.
* @return the repartitioned {@link MessageStream}
*/
- <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor);
+ <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+ Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde);
+ /**
+ * Same as calling {@link #partitionBy(Function, Function, KVSerde)} with a null KVSerde.
+ *
+ * @param keyExtractor the {@link Function} to extract the message and partition key from the input message
+ * @param valueExtractor the {@link Function} to extract the value from the input message
+ * @param <K> the type of output key
+ * @param <V> the type of output value
+ * @return the repartitioned {@link MessageStream}
+ */
+ <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+ Function<? super M, ? extends V> valueExtractor);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
index 7335d56..1ebb535 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/OutputStream.java
@@ -23,11 +23,9 @@ import org.apache.samza.annotation.InterfaceStability;
/**
* An output stream to send messages to.
*
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
- * @param <M> the type of message in this {@link OutputStream}
+ * @param <M> the type of message being sent to this {@link OutputStream}
*/
@InterfaceStability.Unstable
-public interface OutputStream<K, V, M> {
+public interface OutputStream<M> {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
index ea6721b..17223b1 100644
--- a/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
+++ b/samza-api/src/main/java/org/apache/samza/operators/StreamGraph.java
@@ -19,31 +19,56 @@
package org.apache.samza.operators;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.serializers.Serde;
-import java.util.function.BiFunction;
-import java.util.function.Function;
/**
- * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe the processing logic.
+ * Provides access to {@link MessageStream}s and {@link OutputStream}s used to describe application logic.
*/
@InterfaceStability.Unstable
public interface StreamGraph {
/**
+ * Sets the default {@link Serde} to use for (de)serializing messages.
+ * <p>.
+ * If the default serde is set, it must be set <b>before</b> creating any input or output streams.
+ * If no explicit or default serdes are provided, a NoOpSerde is used for keys and values. This means that any
+ * streams created without explicit or default serdes should be cast to MessageStream<KV<Object, Object>>.
+ * Providing an incompatible message type for the input/output streams that use the default serde will result in
+ * {@link ClassCastException}s at runtime.
+ *
+ * @param serde the default message {@link Serde} to use
+ */
+ void setDefaultSerde(Serde<?> serde);
+
+ /**
* Gets the input {@link MessageStream} corresponding to the {@code streamId}.
* <p>
* Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
*
* @param streamId the unique ID for the stream
- * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
- * in the input {@link MessageStream}
- * @param <K> the type of key in the incoming message
- * @param <V> the type of message in the incoming message
+ * @param serde the {@link Serde} to use for deserializing incoming messages
+ * @param <M> the type of messages in the input {@link MessageStream}
+ * @return the input {@link MessageStream}
+ * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
+ */
+ <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde);
+
+ /**
+ * Same as {@link #getInputStream(String, Serde)}, but uses the default {@link Serde} provided via
+ * {@link #setDefaultSerde(Serde)} for deserializing input messages.
+ * <p>
+ * If no default serde has been provided <b>before</b> calling this method, a no-op serde is used.
+ * Providing a message type {@code M} that is incompatible with the default Serde will result in
+ * {@link ClassCastException}s at runtime.
+ * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
+ *
+ * @param streamId the unique ID for the stream
* @param <M> the type of message in the input {@link MessageStream}
* @return the input {@link MessageStream}
* @throws IllegalStateException when invoked multiple times with the same {@code streamId}
*/
- <K, V, M> MessageStream<M> getInputStream(String streamId, BiFunction<? super K, ? super V, ? extends M> msgBuilder);
+ <M> MessageStream<M> getInputStream(String streamId);
/**
* Gets the {@link OutputStream} corresponding to the {@code streamId}.
@@ -51,16 +76,28 @@ public interface StreamGraph {
* Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
*
* @param streamId the unique ID for the stream
- * @param keyExtractor the {@link Function} to extract the outgoing key from the output message
- * @param msgExtractor the {@link Function} to extract the outgoing message from the output message
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
- * @param <M> the type of message in the {@link OutputStream}
+ * @param serde the {@link Serde} to use for serializing outgoing messages
+ * @param <M> the type of messages in the {@link OutputStream}
+ * @return the output {@link MessageStream}
+ * @throws IllegalStateException when invoked multiple times with the same {@code streamId}
+ */
+ <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde);
+
+ /**
+ * Same as {@link #getOutputStream(String, Serde)}, but uses the default {@link Serde} provided via
+ * {@link #setDefaultSerde(Serde)} for serializing output messages.
+ * <p>
+ * If no default serde has been provided <b>before</b> calling this method, a no-op serde is used.
+ * Providing a message type {@code M} that is incompatible with the default Serde will result in
+ * {@link ClassCastException}s at runtime.
+ * Multiple invocations of this method with the same {@code streamId} will throw an {@link IllegalStateException}.
+ *
+ * @param streamId the unique ID for the stream
+ * @param <M> the type of messages in the {@link OutputStream}
* @return the output {@link MessageStream}
* @throws IllegalStateException when invoked multiple times with the same {@code streamId}
*/
- <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
- Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor);
+ <M> OutputStream<M> getOutputStream(String streamId);
/**
* Sets the {@link ContextManager} for this {@link StreamGraph}.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
----------------------------------------------------------------------
diff --git a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
index a59b8c2..ac33c2d 100644
--- a/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
+++ b/samza-api/src/main/java/org/apache/samza/serializers/Serde.java
@@ -19,12 +19,16 @@
package org.apache.samza.serializers;
+import java.io.Serializable;
+
/**
* A Serde is a convenience type that implements both the {@link org.apache.samza.serializers.Serializer} and
* {@link org.apache.samza.serializers.Deserializer} interfaces, allowing it to both read and write data
* in its value type, T.
*
+ * A serde instance itself must be {@link Serializable} using Java serialization.
+ *
* @param <T> The type of serialized object implementations can both read and write
*/
-public interface Serde<T> extends Serializer<T>, Deserializer<T> {
+public interface Serde<T> extends Serializer<T>, Deserializer<T>, Serializable {
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
new file mode 100644
index 0000000..adb8781
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/ByteBufferSerde.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+import java.nio.ByteBuffer
+
+/**
+ * A serializer for ByteBuffers.
+ */
+class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
+ def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde
+}
+
+class ByteBufferSerde extends Serde[ByteBuffer] {
+ def toBytes(byteBuffer: ByteBuffer) = {
+ if (byteBuffer != null) {
+ val bytes = new Array[Byte](byteBuffer.remaining())
+ byteBuffer.duplicate().get(bytes)
+ bytes
+ } else {
+ null
+ }
+ }
+
+ def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
+ ByteBuffer.wrap(bytes)
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
new file mode 100644
index 0000000..968da26
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/ByteSerde.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for bytes that is effectively a no-op but can be useful for
+ * binary messages.
+ */
+class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
+ def getSerde(name: String, config: Config): Serde[Array[Byte]] = new ByteSerde
+}
+
+class ByteSerde extends Serde[Array[Byte]] {
+ def toBytes(bytes: Array[Byte]) = bytes
+
+ def fromBytes(bytes: Array[Byte]) = bytes
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
new file mode 100644
index 0000000..7981d2c
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/DoubleSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for doubles
+ */
+class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
+ def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde
+}
+
+class DoubleSerde extends Serde[java.lang.Double] {
+ def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
+ ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
+ } else {
+ null
+ }
+
+ // big-endian by default
+ def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
+ ByteBuffer.wrap(bytes).getDouble
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
new file mode 100644
index 0000000..46509f7
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/IntegerSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for integers
+ */
+class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
+ def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde
+}
+
+class IntegerSerde extends Serde[java.lang.Integer] {
+ def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
+ ByteBuffer.allocate(4).putInt(obj.intValue).array
+ } else {
+ null
+ }
+
+ // big-endian by default
+ def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
+ ByteBuffer.wrap(bytes).getInt
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
new file mode 100644
index 0000000..446035c
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/JsonSerdeV2.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
+import org.codehaus.jackson.`type`.TypeReference
+import org.codehaus.jackson.map.ObjectMapper
+import org.slf4j.LoggerFactory
+
+
+/**
+ * A serializer for JSON strings. JsonSerdeV2 differs from JsonSerde in that:
+ * <ol>
+ * <li>
+ * It allows specifying the specific POJO type to deserialize to (using JsonSerdeV2(Class[T])
+ * or JsonSerdeV2#of(Class[T]). JsonSerde always returns a LinkedHashMap<String, Object> upon deserialization.
+ * <li>
+ * It uses Jackson's default 'camelCase' property naming convention, which simplifies defining
+ * the POJO to bind to. JsonSerde enforces the 'dash-separated' property naming convention.
+ * </ol>
+ * This JsonSerdeV2 should be preferred over JsonSerde for High Level API applications, unless
+ * backwards compatibility with the older data format (with dasherized names) is required.
+ *
+ * @param clazzOption the class of the POJO being (de)serialized. If this is None,
+ * a LinkedHashMap<String, Object> is returned upon deserialization.
+ * @tparam T the type of the POJO being (de)serialized.
+ */
+class JsonSerdeV2[T] private(clazzOption: Option[Class[T]]) extends Serde[T] {
+ private val LOG = LoggerFactory.getLogger(classOf[JsonSerdeV2[T]])
+ @transient lazy private val mapper = new ObjectMapper()
+
+ def this() {
+ this(None)
+ }
+
+ def this(clazz: Class[T]) {
+ this(Option(clazz))
+ }
+
+ def toBytes(obj: T): Array[Byte] = {
+ try {
+ val str = mapper.writeValueAsString(obj)
+ str.getBytes("UTF-8")
+ } catch {
+ case e: Exception => throw new SamzaException(e);
+ }
+ }
+
+ def fromBytes(bytes: Array[Byte]): T = {
+ val str = new String(bytes, "UTF-8")
+ try {
+ clazzOption match {
+ case Some(clazz) => mapper.readValue(str, clazz)
+ case None => mapper.readValue(str, new TypeReference[T]() {})
+ }
+ } catch {
+ case e: Exception =>
+ LOG.debug(s"Error deserializing message: $str", e)
+ throw new SamzaException(e)
+ }
+ }
+
+}
+
+object JsonSerdeV2 {
+ def of[T](clazz: Class[T]): JsonSerdeV2[T] = {
+ new JsonSerdeV2[T](clazz)
+ }
+}
+
+class JsonSerdeV2Factory extends SerdeFactory[Object] {
+ def getSerde(name: String, config: Config) = new JsonSerdeV2
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
new file mode 100644
index 0000000..5b0a6e3
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/KVSerde.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.SamzaException
+import org.apache.samza.operators.KV
+
+object KVSerde {
+ def of[K, V](keySerde: Serde[K], valueSerde: Serde[V]) = new KVSerde[K, V](keySerde, valueSerde)
+}
+
+/**
+ * A marker serde class to indicate that messages are keyed and should be deserialized as K-V pairs.
+ * This class is intended for use cases where a single Serde parameter or configuration is required.
+ *
+ * @tparam K type of the key in the message
+ * @tparam V type of the value in the message
+ */
+class KVSerde[K, V](keySerde: Serde[K], valueSerde: Serde[V]) extends Serde[KV[K, V]] {
+ /**
+ * Implementation Note: This serde must not be used by the framework for serialization/deserialization directly.
+ * Wire up and use the constituent keySerde and valueSerde instead.
+ */
+
+ override def fromBytes(bytes: Array[Byte]): Nothing = {
+ throw new NotImplementedError("This is a marker serde and must not be used directly. " +
+ "Samza must wire up and use the keySerde and valueSerde instead.")
+ }
+
+ override def toBytes(`object`: KV[K, V]): Nothing = {
+ throw new SamzaException("This is a marker serde and must not be used directly. " +
+ "Samza must wire up and use the keySerde and valueSerde instead.")
+ }
+
+ def getKeySerde: Serde[K] = keySerde
+
+ def getValueSerde: Serde[V] = valueSerde
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
new file mode 100644
index 0000000..41ff598
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/LongSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for longs
+ */
+class LongSerdeFactory extends SerdeFactory[java.lang.Long] {
+ def getSerde(name: String, config: Config): Serde[java.lang.Long] = new LongSerde
+}
+
+class LongSerde extends Serde[java.lang.Long] {
+ def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) {
+ ByteBuffer.allocate(8).putLong(obj.longValue()).array
+ } else {
+ null
+ }
+
+ // big-endian by default
+ def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) {
+ ByteBuffer.wrap(bytes).getLong
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
new file mode 100644
index 0000000..c656526
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/NoOpSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.serializers
+
+/**
+ * A marker serde class to indicate that messages should not be serialized or deserialized.
+ * This is the same behavior as when no serde is provided, and is intended for use cases where
+ * a Serde parameter or configuration is required.
+ * This is different than [[ByteSerde]] which is a pass-through serde for byte arrays.
+ *
+ * @tparam T type of messages which should not be serialized or deserialized
+ */
+class NoOpSerde[T] extends Serde[T] {
+
+ override def fromBytes(bytes: Array[Byte]): T =
+ throw new NotImplementedError("NoOpSerde fromBytes should not be invoked by the framework.")
+
+ override def toBytes(obj: T): Array[Byte] =
+ throw new NotImplementedError("NoOpSerde toBytes should not be invoked by the framework.")
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
new file mode 100644
index 0000000..c43f863
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/SerializableSerde.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.io.ByteArrayInputStream
+import java.io.ByteArrayOutputStream
+import java.io.ObjectInputStream
+import java.io.ObjectOutputStream
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for Serializable
+ */
+class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
+ def getSerde(name: String, config: Config): Serde[T] =
+ new SerializableSerde[T]
+}
+
+class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
+ def toBytes(obj: T): Array[Byte] = if (obj != null) {
+ val bos = new ByteArrayOutputStream
+ val oos = new ObjectOutputStream(bos)
+
+ try {
+ oos.writeObject(obj)
+ }
+ finally {
+ oos.close()
+ }
+
+ bos.toByteArray
+ } else {
+ null
+ }
+
+ def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
+ val bis = new ByteArrayInputStream(bytes)
+ val ois = new ObjectInputStream(bis)
+
+ try {
+ ois.readObject.asInstanceOf[T]
+ }
+ finally{
+ ois.close()
+ }
+ } else {
+ null.asInstanceOf[T]
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
new file mode 100644
index 0000000..c69c402
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/StringSerde.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for strings
+ */
+class StringSerdeFactory extends SerdeFactory[String] {
+ def getSerde(name: String, config: Config): Serde[String] =
+ new StringSerde(config.get("encoding", "UTF-8"))
+}
+
+class StringSerde(val encoding: String) extends Serde[String] {
+ // constructor (for Java) that defaults to UTF-8 encoding
+ def this() {
+ this("UTF-8")
+ }
+
+ def toBytes(obj: String): Array[Byte] = if (obj != null) {
+ obj.toString.getBytes(encoding)
+ } else {
+ null
+ }
+
+ def fromBytes(bytes: Array[Byte]): String = if (bytes != null) {
+ new String(bytes, 0, bytes.size, encoding)
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
new file mode 100644
index 0000000..88d4327
--- /dev/null
+++ b/samza-api/src/main/scala/org.apache.samza.serializers/UUIDSerde.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.ByteBuffer
+import java.util.UUID
+
+import org.apache.samza.config.Config
+
+/**
+ * A serializer for UUID
+ */
+class UUIDSerdeFactory extends SerdeFactory[UUID] {
+ def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde
+}
+
+class UUIDSerde() extends Serde[UUID] {
+ def toBytes(obj: UUID): Array[Byte] = if (obj != null) {
+ ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array
+ } else {
+ null
+ }
+
+ def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) {
+ val buffer = ByteBuffer.wrap(bytes)
+ new UUID(buffer.getLong, buffer.getLong)
+ } else {
+ null
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
new file mode 100644
index 0000000..eddfb0a
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteBufferSerde.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+import org.junit.Assert._
+import org.junit.Test
+import java.nio.ByteBuffer
+
+class TestByteBufferSerde {
+ @Test
+ def testSerde {
+ val serde = new ByteBufferSerde
+ assertNull(serde.toBytes(null))
+ assertNull(serde.fromBytes(null))
+
+ val bytes = "A lazy way of creating a byte array".getBytes()
+ val byteBuffer = ByteBuffer.wrap(bytes)
+ byteBuffer.mark()
+ assertArrayEquals(serde.toBytes(byteBuffer), bytes)
+ byteBuffer.reset()
+ assertEquals(serde.fromBytes(bytes), byteBuffer)
+ }
+
+ @Test
+ def testSerializationPreservesInput {
+ val serde = new ByteBufferSerde
+ val bytes = "A lazy way of creating a byte array".getBytes()
+ val byteBuffer = ByteBuffer.wrap(bytes)
+ byteBuffer.get() // advance position by 1
+ serde.toBytes(byteBuffer)
+
+ assertEquals(byteBuffer.capacity(), byteBuffer.limit())
+ assertEquals(1, byteBuffer.position())
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
new file mode 100644
index 0000000..f605762
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestByteSerde.scala
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestByteSerde {
+ @Test
+ def testByteSerde {
+ val serde = new ByteSerde
+ assertNull(serde.toBytes(null))
+ assertNull(serde.fromBytes(null))
+
+ val testBytes = "A lazy way of creating a byte array".getBytes()
+ assertArrayEquals(serde.toBytes(testBytes), testBytes)
+ assertArrayEquals(serde.fromBytes(testBytes), testBytes)
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
new file mode 100644
index 0000000..60241b5
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestDoubleSerde.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestDoubleSerde {
+ @Test
+ def testDoubleSerde {
+ val serde = new DoubleSerde
+ assertEquals(null, serde.toBytes(null))
+ assertEquals(null, serde.fromBytes(null))
+
+ val fooBar = 9.156013e-002
+ val fooBarBytes = serde.toBytes(fooBar)
+ fooBarBytes.foreach(System.err.println)
+ assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
+ assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
new file mode 100644
index 0000000..a3e7e1f
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestIntegerSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestIntegerSerde {
+ @Test
+ def testIntegerSerde {
+ val serde = new IntegerSerde
+ assertEquals(null, serde.toBytes(null))
+ assertEquals(null, serde.fromBytes(null))
+
+ val fooBar = 37
+ val fooBarBytes = serde.toBytes(fooBar)
+ assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
+ assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
new file mode 100644
index 0000000..f5202c8
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestJsonSerdeV2.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+
+import org.junit.Assert._
+import org.junit.Test
+
+import scala.collection.JavaConverters._
+
+
+class TestJsonSerdeV2 {
+ @Test
+ def testJsonSerdeV2ShouldWork {
+ val serde = new JsonSerdeV2[java.util.HashMap[String, Object]]
+ val obj = new java.util.HashMap[String, Object](Map[String, Object]("hi" -> "bye", "why" -> new java.lang.Integer(2)).asJava)
+ val bytes = serde.toBytes(obj)
+ assertEquals(obj, serde.fromBytes(bytes))
+ val serdeHashMapEntry = new JsonSerdeV2[java.util.Map.Entry[String, Object]]
+ obj.entrySet().asScala.foreach(entry => {
+ try {
+ val entryBytes = serdeHashMapEntry.toBytes(entry)
+ } catch {
+ case e: Exception => fail("HashMap Entry serialization failed!")
+ }
+ })
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
new file mode 100644
index 0000000..77a7498
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestLongSerde.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.util.Arrays
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestLongSerde {
+ @Test
+ def testLongSerde {
+ val serde = new LongSerde
+ assertEquals(null, serde.toBytes(null))
+ assertEquals(null, serde.fromBytes(null))
+
+ val fooBar = 1234123412341234L
+ val fooBarBytes = serde.toBytes(fooBar)
+ fooBarBytes.foreach(System.err.println)
+ assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
+ assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
new file mode 100644
index 0000000..8e899d0
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestSerializableSerde.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestSerializableSerde {
+ @Test
+ def testSerializableSerde {
+ val serde = new SerializableSerde[String]
+ assertNull(serde.toBytes(null))
+ assertNull(serde.fromBytes(null))
+
+ val obj = "String is serializable"
+
+ // Serialized string is prefix + string itself
+ val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
+ val expected = (prefix ++ obj.getBytes("UTF-8"))
+
+ val bytes = serde.toBytes(obj)
+
+ assertArrayEquals(expected, bytes)
+
+ val objRoundTrip:String = serde.fromBytes(bytes)
+ assertEquals(obj, objRoundTrip)
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
new file mode 100644
index 0000000..a1e8e88
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestStringSerde.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestStringSerde {
+ @Test
+ def testStringSerde {
+ val serde = new StringSerde("UTF-8")
+ assertEquals(null, serde.toBytes(null))
+ assertEquals(null, serde.fromBytes(null))
+
+ val fooBar = "foo bar"
+ val fooBarBytes = serde.toBytes(fooBar)
+ assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
+ assertEquals(fooBar, serde.fromBytes(fooBarBytes))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala b/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
new file mode 100644
index 0000000..04ddcdb
--- /dev/null
+++ b/samza-api/src/test/scala/org.apache.samza.serializers/TestUUIDSerde.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.serializers
+
+import java.nio.BufferUnderflowException
+import java.util.UUID
+
+import org.junit.Assert._
+import org.junit.Test
+
+class TestUUIDSerde {
+ private val serde = new UUIDSerde
+
+ @Test
+ def testUUIDSerde {
+ val uuid = new UUID(13, 42)
+ val bytes = serde.toBytes(uuid)
+ assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
+ assertEquals(uuid, serde.fromBytes(bytes))
+ }
+
+ @Test
+ def testToBytesWhenNull {
+ assertEquals(null, serde.toBytes(null))
+ }
+
+ @Test
+ def testFromBytesWhenNull {
+ assertEquals(null, serde.fromBytes(null))
+ }
+
+ @Test(expected = classOf[BufferUnderflowException])
+ def testFromBytesWhenInvalid {
+ assertEquals(null, serde.fromBytes(Array[Byte](0)))
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
index 88b24ba..fea42f2 100644
--- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
+++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java
@@ -21,27 +21,34 @@ package org.apache.samza.execution;
import com.google.common.base.Joiner;
import java.util.ArrayList;
+import java.util.Base64;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.util.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A JobNode is a physical execution unit. In RemoteExecutionEnvironment, it's a job that will be submitted
* to remote cluster. In LocalExecutionEnvironment, it's a set of StreamProcessors for local execution.
@@ -127,11 +134,88 @@ public class JobNode {
// write input/output streams to configs
inEdges.stream().filter(StreamEdge::isIntermediate).forEach(edge -> addStreamConfig(edge, configs));
+ // write serialized serde instances and stream serde configs to configs
+ addSerdeConfigs(configs);
+
log.info("Job {} has generated configs {}", jobName, configs);
String configPrefix = String.format(CONFIG_JOB_PREFIX, jobName);
- // TODO: Disallow user specifying job inputs/outputs. This info comes strictly from the pipeline.
- return new JobConfig(Util.rewriteConfig(extractScopedConfig(config, new MapConfig(configs), configPrefix)));
+
+ // Disallow user specified job inputs/outputs. This info comes strictly from the pipeline.
+ Map<String, String> allowedConfigs = new HashMap<>(config);
+ if (allowedConfigs.containsKey(TaskConfig.INPUT_STREAMS())) {
+ log.warn("Specifying task inputs in configuration is not allowed with Fluent API. "
+ + "Ignoring configured value for " + TaskConfig.INPUT_STREAMS());
+ allowedConfigs.remove(TaskConfig.INPUT_STREAMS());
+ }
+
+ log.debug("Job {} has allowed configs {}", jobName, allowedConfigs);
+ return new JobConfig(
+ Util.rewriteConfig(
+ extractScopedConfig(new MapConfig(allowedConfigs), new MapConfig(configs), configPrefix)));
+ }
+
+ /**
+ * Serializes the {@link Serde} instances for operators, adds them to the provided config, and
+ * sets the serde configuration for the input/output/intermediate streams appropriately.
+ *
+ * We try to preserve the number of Serde instances before and after serialization. However we don't
+ * guarantee that references shared between these serdes instances (e.g. an Jackson ObjectMapper shared
+ * between two json serdes) are shared after deserialization too.
+ *
+ * Ideally all the user defined objects in the application should be serialized and de-serialized in one pass
+ * from the same output/input stream so that we can maintain reference sharing relationships.
+ *
+ * @param configs the configs to add serialized serde instances and stream serde configs to
+ */
+ protected void addSerdeConfigs(Map<String, String> configs) {
+ // collect all key and msg serde instances for streams
+ Map<String, Serde> keySerdes = new HashMap<>();
+ Map<String, Serde> msgSerdes = new HashMap<>();
+ Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
+ inEdges.forEach(edge -> {
+ String streamId = edge.getStreamSpec().getId();
+ InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
+ Serde keySerde = inputOperatorSpec.getKeySerde();
+ Serde valueSerde = inputOperatorSpec.getValueSerde();
+ keySerdes.put(streamId, keySerde);
+ msgSerdes.put(streamId, valueSerde);
+ });
+ Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
+ outEdges.forEach(edge -> {
+ String streamId = edge.getStreamSpec().getId();
+ OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
+ Serde keySerde = outputStream.getKeySerde();
+ Serde valueSerde = outputStream.getValueSerde();
+ keySerdes.put(streamId, keySerde);
+ msgSerdes.put(streamId, valueSerde);
+ });
+
+ // for each unique serde instance, generate a unique name and serialize to config
+ HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
+ serdes.addAll(msgSerdes.values());
+ SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+ Base64.Encoder base64Encoder = Base64.getEncoder();
+ Map<Serde, String> serdeUUIDs = new HashMap<>();
+ serdes.forEach(serde -> {
+ String serdeName = serdeUUIDs.computeIfAbsent(serde,
+ s -> serde.getClass().getSimpleName() + "-" + UUID.randomUUID().toString());
+ configs.putIfAbsent(String.format(SerializerConfig.SERDE_SERIALIZED_INSTANCE(), serdeName),
+ base64Encoder.encodeToString(serializableSerde.toBytes(serde)));
+ });
+
+ // set key and msg serdes for streams to the serde names generated above
+ keySerdes.forEach((streamId, serde) -> {
+ String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
+ String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
+ configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
+ });
+
+ msgSerdes.forEach((streamId, serde) -> {
+ String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
+ String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
+ configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
index db6fd5a..7b93a9e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/MessageStreamImpl.java
@@ -29,12 +29,14 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpecs;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
+import org.apache.samza.serializers.KVSerde;
import java.time.Duration;
import java.util.Collection;
@@ -95,9 +97,9 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
}
@Override
- public <K, V> void sendTo(OutputStream<K, V, M> outputStream) {
+ public void sendTo(OutputStream<M> outputStream) {
OutputOperatorSpec<M> op = OperatorSpecs.createSendToOperatorSpec(
- (OutputStreamImpl<K, V, M>) outputStream, this.graph.getNextOpId());
+ (OutputStreamImpl<M>) outputStream, this.graph.getNextOpId());
this.operatorSpec.registerNextOperatorSpec(op);
}
@@ -133,17 +135,24 @@ public class MessageStreamImpl<M> implements MessageStream<M> {
}
@Override
- public <K> MessageStream<M> partitionBy(Function<? super M, ? extends K> keyExtractor) {
+ public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+ Function<? super M, ? extends V> valueExtractor, KVSerde<K, V> serde) {
int opId = this.graph.getNextOpId();
String opName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), opId);
- IntermediateMessageStreamImpl<K, M, M> intermediateStream =
- this.graph.getIntermediateStream(opName, keyExtractor, m -> m, (k, m) -> m);
- OutputOperatorSpec<M> partitionByOperatorSpec = OperatorSpecs.createPartitionByOperatorSpec(
- intermediateStream.getOutputStream(), opId);
+ IntermediateMessageStreamImpl<KV<K, V>> intermediateStream = this.graph.getIntermediateStream(opName, serde);
+ PartitionByOperatorSpec<M, K, V> partitionByOperatorSpec =
+ OperatorSpecs.createPartitionByOperatorSpec(
+ intermediateStream.getOutputStream(), keyExtractor, valueExtractor, opId);
this.operatorSpec.registerNextOperatorSpec(partitionByOperatorSpec);
return intermediateStream;
}
+ @Override
+ public <K, V> MessageStream<KV<K, V>> partitionBy(Function<? super M, ? extends K> keyExtractor,
+ Function<? super M, ? extends V> valueExtractor) {
+ return partitionBy(keyExtractor, valueExtractor, null);
+ }
+
/**
* Get the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
* @return the {@link OperatorSpec} associated with this {@link MessageStreamImpl}.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
index 2c2eb56..45378c7 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java
@@ -18,14 +18,20 @@
*/
package org.apache.samza.operators;
+import com.google.common.base.Preconditions;
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
-import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -33,8 +39,6 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import java.util.stream.Collectors;
/**
@@ -42,6 +46,7 @@ import java.util.stream.Collectors;
* create the DAG of transforms.
*/
public class StreamGraphImpl implements StreamGraph {
+ private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class);
/**
* Unique identifier for each {@link org.apache.samza.operators.spec.OperatorSpec} in the graph.
@@ -55,6 +60,7 @@ public class StreamGraphImpl implements StreamGraph {
private final ApplicationRunner runner;
private final Config config;
+ private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde());
private ContextManager contextManager = null;
public StreamGraphImpl(ApplicationRunner runner, Config config) {
@@ -65,46 +71,51 @@ public class StreamGraphImpl implements StreamGraph {
}
@Override
- public <K, V, M> MessageStream<M> getInputStream(String streamId,
- BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
- if (msgBuilder == null) {
- throw new IllegalArgumentException("msgBuilder can't be null for an input stream");
- }
+ public void setDefaultSerde(Serde<?> serde) {
+ Preconditions.checkNotNull(serde, "Default serde must not be null");
+ Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(),
+ "Default serde must be set before creating any input or output streams.");
+ this.defaultSerde = serde;
+ }
- if (inputOperators.containsKey(runner.getStreamSpec(streamId))) {
- throw new IllegalStateException("getInputStream() invoked multiple times "
- + "with the same streamId: " + streamId);
- }
+ @Override
+ public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
+ Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
+ Preconditions.checkState(!inputOperators.containsKey(runner.getStreamSpec(streamId)),
+ "getInputStream must not be called multiple times with the same streamId: " + streamId);
StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ boolean isKeyedInput = serde instanceof KVSerde;
inputOperators.put(streamSpec,
- new InputOperatorSpec<>(streamSpec, (BiFunction<K, V, M>) msgBuilder, this.getNextOpId()));
+ new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyedInput, this.getNextOpId()));
return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
@Override
- public <K, V, M> OutputStream<K, V, M> getOutputStream(String streamId,
- Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor) {
- if (keyExtractor == null) {
- throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
- }
-
- if (msgExtractor == null) {
- throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
- }
+ public <M> MessageStream<M> getInputStream(String streamId) {
+ return (MessageStream<M>) getInputStream(streamId, defaultSerde);
+ }
- if (outputStreams.containsKey(runner.getStreamSpec(streamId))) {
- throw new IllegalStateException("getOutputStream() invoked multiple times "
- + "with the same streamId: " + streamId);
- }
+ @Override
+ public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
+ Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
+ Preconditions.checkState(!outputStreams.containsKey(runner.getStreamSpec(streamId)),
+ "getOutputStream must not be called multiple times with the same streamId: " + streamId);
StreamSpec streamSpec = runner.getStreamSpec(streamId);
+ KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
outputStreams.put(streamSpec,
- new OutputStreamImpl<>(streamSpec, (Function<M, K>) keyExtractor, (Function<M, V>) msgExtractor));
+ new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), serde instanceof KVSerde));
return outputStreams.get(streamSpec);
}
@Override
+ public <M> OutputStream<M> getOutputStream(String streamId) {
+ return (OutputStream<M>) getOutputStream(streamId, defaultSerde);
+ }
+
+ @Override
public StreamGraph withContextManager(ContextManager contextManager) {
this.contextManager = contextManager;
return this;
@@ -116,38 +127,31 @@ public class StreamGraphImpl implements StreamGraph {
*
* @param streamName the name of the stream to be created. Will be prefixed with job name and id to generate the
* logical streamId.
- * @param keyExtractor the {@link Function} to extract the outgoing key from the intermediate message
- * @param msgExtractor the {@link Function} to extract the outgoing message from the intermediate message
- * @param msgBuilder the {@link BiFunction} to convert the incoming key and message to a message
- * in the intermediate {@link MessageStream}
- * @param <K> the type of key in the intermediate message
- * @param <V> the type of message in the intermediate message
+ * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
+ * is used.
* @param <M> the type of messages in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStreamImpl}
*/
- <K, V, M> IntermediateMessageStreamImpl<K, V, M> getIntermediateStream(String streamName,
- Function<? super M, ? extends K> keyExtractor, Function<? super M, ? extends V> msgExtractor,
- BiFunction<? super K, ? super V, ? extends M> msgBuilder) {
+ <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamName, Serde<M> serde) {
String streamId = String.format("%s-%s-%s",
config.get(JobConfig.JOB_NAME()),
config.get(JobConfig.JOB_ID(), "1"),
streamName);
- if (msgBuilder == null) {
- throw new IllegalArgumentException("msgBuilder cannot be null for an intermediate stream");
- }
- if (keyExtractor == null) {
- throw new IllegalArgumentException("keyExtractor can't be null for an output stream.");
- }
- if (msgExtractor == null) {
- throw new IllegalArgumentException("msgExtractor can't be null for an output stream.");
- }
StreamSpec streamSpec = runner.getStreamSpec(streamId);
- if (inputOperators.containsKey(streamSpec) || outputStreams.containsKey(streamSpec)) {
- throw new IllegalStateException("getIntermediateStream() invoked multiple times "
- + "with the same streamId: " + streamId);
+
+ Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
+ "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
+
+ if (serde == null) {
+ LOGGER.info("Using default serde for intermediate stream: " + streamId);
+ serde = (Serde<M>) defaultSerde;
}
- inputOperators.put(streamSpec, new InputOperatorSpec(streamSpec, msgBuilder, this.getNextOpId()));
- outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, keyExtractor, msgExtractor));
+
+ boolean isKeyed = serde instanceof KVSerde;
+ KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
+ inputOperators.put(streamSpec,
+ new InputOperatorSpec<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId()));
+ outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
}
@@ -205,4 +209,27 @@ public class StreamGraphImpl implements StreamGraph {
return windowOrJoinSpecs.size() != 0;
}
+
+ private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
+ Serde keySerde, valueSerde;
+
+ if (serde instanceof KVSerde) {
+ keySerde = ((KVSerde) serde).getKeySerde();
+ valueSerde = ((KVSerde) serde).getValueSerde();
+ } else {
+ keySerde = new NoOpSerde();
+ valueSerde = serde;
+ }
+
+ if (keySerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
+ ". Keys will not be (de)serialized");
+ }
+ if (valueSerde instanceof NoOpSerde) {
+ LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
+ ". Values will not be (de)serialized");
+ }
+
+ return KV.of(keySerde, valueSerde);
+ }
}
[2/4] samza git commit: SAMZA-1109;
Allow setting Serdes for fluent API operators in code
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
index 2c8f682..8c75bca 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestExecutionPlanner.java
@@ -24,6 +24,7 @@ import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.TaskConfig;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
@@ -45,8 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -122,11 +121,11 @@ public class TestExecutionPlanner {
*
*/
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- BiFunction mockBuilder = mock(BiFunction.class);
- streamGraph.getInputStream("input1", mockBuilder)
- .partitionBy(m -> "yes!!!").map(m -> m)
+ MessageStream<KV<Object, Object>> input1 = streamGraph.getInputStream("input1");
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ input1
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(kv -> kv)
.sendTo(output1);
return streamGraph;
}
@@ -145,13 +144,20 @@ public class TestExecutionPlanner {
*/
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction msgBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(output1);
m3.join(m2, mock(JoinFunction.class), Duration.ofHours(1)).sendTo(output2);
@@ -162,21 +168,28 @@ public class TestExecutionPlanner {
private StreamGraphImpl createStreamGraphWithJoinAndWindow() {
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction msgBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", msgBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", msgBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", msgBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> output1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> output2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> output1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> output2 = streamGraph.getOutputStream("output2");
m1.map(m -> m)
.filter(m->true)
- .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(8)));
m2.map(m -> m)
.filter(m->true)
- .window(Windows.<Object, Object>keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
+ .window(Windows.keyedTumblingWindow(m -> m, Duration.ofMillis(16)));
m1.join(m2, mock(JoinFunction.class), Duration.ofMillis(1600)).sendTo(output1);
m3.join(m2, mock(JoinFunction.class), Duration.ofMillis(100)).sendTo(output2);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
index 4bda86b..095e407 100644
--- a/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobGraphJsonGenerator.java
@@ -19,25 +19,26 @@
package org.apache.samza.execution;
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.function.BiFunction;
-import java.util.function.Function;
-
import org.apache.samza.config.Config;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemAdmin;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.Test;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
import static org.apache.samza.execution.TestExecutionPlanner.createSystemAdmin;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
@@ -105,13 +106,21 @@ public class TestJobGraphJsonGenerator {
StreamManager streamManager = new StreamManager(systemAdmins);
StreamGraphImpl streamGraph = new StreamGraphImpl(runner, config);
- BiFunction mockBuilder = mock(BiFunction.class);
- MessageStream m1 = streamGraph.getInputStream("input1", mockBuilder).map(m -> m);
- MessageStream m2 = streamGraph.getInputStream("input2", mockBuilder).partitionBy(m -> "haha").filter(m -> true);
- MessageStream m3 = streamGraph.getInputStream("input3", mockBuilder).filter(m -> true).partitionBy(m -> "hehe").map(m -> m);
- Function mockFn = mock(Function.class);
- OutputStream<Object, Object, Object> outputStream1 = streamGraph.getOutputStream("output1", mockFn, mockFn);
- OutputStream<Object, Object, Object> outputStream2 = streamGraph.getOutputStream("output2", mockFn, mockFn);
+ streamGraph.setDefaultSerde(KVSerde.of(new NoOpSerde<>(), new NoOpSerde<>()));
+ MessageStream<KV<Object, Object>> m1 =
+ streamGraph.<KV<Object, Object>>getInputStream("input1")
+ .map(m -> m);
+ MessageStream<KV<Object, Object>> m2 =
+ streamGraph.<KV<Object, Object>>getInputStream("input2")
+ .partitionBy(m -> m.key, m -> m.value)
+ .filter(m -> true);
+ MessageStream<KV<Object, Object>> m3 =
+ streamGraph.<KV<Object, Object>>getInputStream("input3")
+ .filter(m -> true)
+ .partitionBy(m -> m.key, m -> m.value)
+ .map(m -> m);
+ OutputStream<KV<Object, Object>> outputStream1 = streamGraph.getOutputStream("output1");
+ OutputStream<KV<Object, Object>> outputStream2 = streamGraph.getOutputStream("output2");
m1.join(m2, mock(JoinFunction.class), Duration.ofHours(2)).sendTo(outputStream1);
m2.sink((message, collector, coordinator) -> { });
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
new file mode 100644
index 0000000..c59c0cc
--- /dev/null
+++ b/samza-core/src/test/java/org/apache/samza/execution/TestJobNode.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.execution;
+
+import org.apache.samza.config.Config;
+import org.apache.samza.config.MapConfig;
+import org.apache.samza.config.SerializerConfig;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.OutputStream;
+import org.apache.samza.operators.StreamGraphImpl;
+import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.Serde;
+import org.apache.samza.serializers.SerializableSerde;
+import org.apache.samza.serializers.StringSerde;
+import org.apache.samza.system.StreamSpec;
+import org.junit.Test;
+
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+
+public class TestJobNode {
+
+ @Test
+ public void testAddSerdeConfigs() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec inputSpec = new StreamSpec("input", "input", "input-system");
+ StreamSpec outputSpec = new StreamSpec("output", "output", "output-system");
+ StreamSpec partitionBySpec = new StreamSpec("null-null-partition_by-1", "partition_by-1", "intermediate-system");
+ doReturn(inputSpec).when(mockRunner).getStreamSpec("input");
+ doReturn(outputSpec).when(mockRunner).getStreamSpec("output");
+ doReturn(partitionBySpec).when(mockRunner).getStreamSpec("null-null-partition_by-1");
+
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ streamGraph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>()));
+ MessageStream<KV<String, Object>> input = streamGraph.getInputStream("input");
+ OutputStream<KV<String, Object>> output = streamGraph.getOutputStream("output");
+ input.partitionBy(KV::getKey, KV::getValue).sendTo(output);
+
+ JobNode jobNode = new JobNode("jobName", "jobId", streamGraph, mock(Config.class));
+ StreamEdge inputEdge = new StreamEdge(inputSpec);
+ StreamEdge outputEdge = new StreamEdge(outputSpec);
+ StreamEdge repartitionEdge = new StreamEdge(partitionBySpec, true);
+ jobNode.addInEdge(inputEdge);
+ jobNode.addOutEdge(outputEdge);
+ jobNode.addInEdge(repartitionEdge);
+ jobNode.addOutEdge(repartitionEdge);
+
+ Map<String, String> configs = new HashMap<>();
+ jobNode.addSerdeConfigs(configs);
+
+ MapConfig mapConfig = new MapConfig(configs);
+ Config serializers = mapConfig.subset("serializers.registry.", true);
+
+ // make sure that the serializers deserialize correctly
+ SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
+ Map<String, Serde> deserializedSerdes = serializers.entrySet().stream().collect(Collectors.toMap(
+ e -> e.getKey().replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX(), ""),
+ e -> serializableSerde.fromBytes(Base64.getDecoder().decode(e.getValue().getBytes()))
+ ));
+ assertEquals(2, serializers.size());
+
+ String inputKeySerde = mapConfig.get("streams.input.samza.key.serde");
+ String inputMsgSerde = mapConfig.get("streams.input.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(inputKeySerde));
+ assertTrue(inputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(inputMsgSerde));
+ assertTrue(inputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String outputKeySerde = mapConfig.get("streams.output.samza.key.serde");
+ String outputMsgSerde = mapConfig.get("streams.output.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(outputKeySerde));
+ assertTrue(outputKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(outputMsgSerde));
+ assertTrue(outputMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+
+ String partitionByKeySerde = mapConfig.get("streams.null-null-partition_by-1.samza.key.serde");
+ String partitionByMsgSerde = mapConfig.get("streams.null-null-partition_by-1.samza.msg.serde");
+ assertTrue(deserializedSerdes.containsKey(partitionByKeySerde));
+ assertTrue(partitionByKeySerde.startsWith(StringSerde.class.getSimpleName()));
+ assertTrue(deserializedSerdes.containsKey(partitionByMsgSerde));
+ assertTrue(partitionByMsgSerde.startsWith(JsonSerdeV2.class.getSimpleName()));
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
index c51b1ea..004c5cf 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestJoinOperator.java
@@ -25,6 +25,8 @@ import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
@@ -276,10 +278,11 @@ public class TestJoinOperator {
@Override
public void init(StreamGraph graph, Config config) {
+ KVSerde<Integer, Integer> kvSerde = KVSerde.of(new IntegerSerde(), new IntegerSerde());
MessageStream<FirstStreamIME> inStream =
- graph.getInputStream("instream", FirstStreamIME::new);
+ graph.getInputStream("instream", kvSerde).map(FirstStreamIME::new);
MessageStream<SecondStreamIME> inStream2 =
- graph.getInputStream("instream2", SecondStreamIME::new);
+ graph.getInputStream("instream2", kvSerde).map(SecondStreamIME::new);
SystemStream outputSystemStream = new SystemStream("outputSystem", "outputStream");
inStream
@@ -330,14 +333,24 @@ public class TestJoinOperator {
}
private static class FirstStreamIME extends IncomingMessageEnvelope {
+ FirstStreamIME(KV<Integer, Integer> message) {
+ super(new SystemStreamPartition(
+ "insystem", "instream", new Partition(0)), "1", message.getKey(), message.getValue());
+ }
+
FirstStreamIME(Integer key, Integer message) {
- super(new SystemStreamPartition("insystem", "instream", new Partition(0)), "1", key, message);
+ this(KV.of(key, message));
}
}
private static class SecondStreamIME extends IncomingMessageEnvelope {
+ SecondStreamIME(KV<Integer, Integer> message) {
+ super(new SystemStreamPartition(
+ "insystem2", "instream2", new Partition(0)), "1", message.getKey(), message.getValue());
+ }
+
SecondStreamIME(Integer key, Integer message) {
- super(new SystemStreamPartition("insystem2", "instream2", new Partition(0)), "1", key, message);
+ this(KV.of(key, message));
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
index 61224f2..c6554bc 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestMessageStreamImpl.java
@@ -32,6 +32,7 @@ import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
@@ -39,20 +40,19 @@ import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
+import org.apache.samza.serializers.KVSerde;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
-import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Supplier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
@@ -172,9 +172,8 @@ public class TestMessageStreamImpl {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
-
- OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
- inputStream.sendTo(mockOutputOpSpec);
+ OutputStreamImpl<TestMessageEnvelope> mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ inputStream.sendTo(mockOutputStreamImpl);
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
@@ -182,33 +181,75 @@ public class TestMessageStreamImpl {
assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
assertEquals(OpCode.SEND_TO, registeredOpSpec.getOpCode());
- assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockOutputStreamImpl, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+
+ // same behavior as above so nothing new to assert. but ensures that this variant compiles.
+ MessageStreamImpl<KV<String, TestMessageEnvelope>> keyedInputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ OutputStreamImpl<KV<String, TestMessageEnvelope>> mockKeyedOutputStreamImpl = mock(OutputStreamImpl.class);
+ keyedInputStream.sendTo(mockKeyedOutputStreamImpl);
+
+ // can't unit test it, but the following variants should not compile
+// inputStream.sendTo(mockKeyedOutputStreamImpl);
+// keyedInputStream.sendTo(mockOutputStreamImpl);
}
@Test
- public void testPartitionBy() {
+ public void testRepartition() {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
- Function<TestMessageEnvelope, String> mockKeyFn = mock(Function.class);
- OutputStreamImpl mockOutputOpSpec = mock(OutputStreamImpl.class);
+ OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ KVSerde mockKVSerde = mock(KVSerde.class);
IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
- when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKeyFn), any(Function.class), any(BiFunction.class)))
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(mockKVSerde)))
.thenReturn(mockIntermediateStream);
when(mockIntermediateStream.getOutputStream())
- .thenReturn(mockOutputOpSpec);
+ .thenReturn(mockOutputStreamImpl);
MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
- inputStream.partitionBy(mockKeyFn);
+ Function mockKeyFunction = mock(Function.class);
+ Function mockValueFunction = mock(Function.class);
+ inputStream.partitionBy(mockKeyFunction, mockValueFunction, mockKVSerde);
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
- assertTrue(registeredOpSpec instanceof OutputOperatorSpec);
+ assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
+ assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
+ assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+ assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
+ }
+
+ @Test
+ public void testRepartitionWithoutSerde() {
+ StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
+ OperatorSpec mockOpSpec = mock(OperatorSpec.class);
+
+ String streamName = String.format("%s-%s", OperatorSpec.OpCode.PARTITION_BY.name().toLowerCase(), 0);
+ OutputStreamImpl mockOutputStreamImpl = mock(OutputStreamImpl.class);
+ IntermediateMessageStreamImpl mockIntermediateStream = mock(IntermediateMessageStreamImpl.class);
+ when(mockGraph.getIntermediateStream(eq(streamName), eq(null)))
+ .thenReturn(mockIntermediateStream);
+ when(mockIntermediateStream.getOutputStream())
+ .thenReturn(mockOutputStreamImpl);
+
+ MessageStreamImpl<TestMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
+ Function mockKeyFunction = mock(Function.class);
+ Function mockValueFunction = mock(Function.class);
+ inputStream.partitionBy(mockKeyFunction, mockValueFunction);
+
+ ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
+ verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
+ OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
+
+ assertTrue(registeredOpSpec instanceof PartitionByOperatorSpec);
assertEquals(OpCode.PARTITION_BY, registeredOpSpec.getOpCode());
- assertEquals(mockOutputOpSpec, ((OutputOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockOutputStreamImpl, ((PartitionByOperatorSpec) registeredOpSpec).getOutputStream());
+ assertEquals(mockKeyFunction, ((PartitionByOperatorSpec) registeredOpSpec).getKeyFunction());
+ assertEquals(mockValueFunction, ((PartitionByOperatorSpec) registeredOpSpec).getValueFunction());
}
@Test
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
index 1fc60bd..45583c2 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestStreamGraphImpl.java
@@ -27,36 +27,136 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.stream.IntermediateMessageStreamImpl;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestStreamGraphImpl {
@Test
- public void testGetInputStream() {
+ public void testGetInputStreamWithValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+ Serde mockValueSerde = mock(Serde.class);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetInputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ graph.getInputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ graph.setDefaultSerde(mockValueSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
+ assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
+ }
+
+ @Test
+ public void testGetInputStreamWithDefaultDefaultSerde() {
+ // default default serde == user hasn't provided a default serde
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
+
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
+ (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
+ assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
+ assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
+ assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
+ assertTrue(inputOpSpec.getValueSerde() instanceof NoOpSerde);
}
@Test
@@ -65,15 +165,13 @@ public class TestStreamGraphImpl {
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockMsgBuilder);
+ MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
- assertEquals(mockMsgBuilder, inputOpSpec.getMsgBuilder());
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
}
@@ -86,12 +184,12 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-2")).thenReturn(mockStreamSpec2);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1", mock(BiFunction.class));
- MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2", mock(BiFunction.class));
+ MessageStream<Object> inputStream1 = graph.getInputStream("test-stream-1");
+ MessageStream<Object> inputStream2 = graph.getInputStream("test-stream-2");
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec1 =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec1 =
(InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream1).getOperatorSpec();
- InputOperatorSpec<String, String, TestMessageEnvelope> inputOpSpec2 =
+ InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec2 =
(InputOperatorSpec) ((MessageStreamImpl<Object>) inputStream2).getOperatorSpec();
assertEquals(graph.getInputOperators().size(), 2);
@@ -105,29 +203,149 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getInputStream("test-stream-1", mock(BiFunction.class));
- graph.getInputStream("test-stream-1", mock(BiFunction.class)); // should throw exception
+ graph.getInputStream("test-stream-1");
+ // should throw exception
+ graph.getInputStream("test-stream-1");
+ }
+
+ @Test
+ public void testGetOutputStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ Serde mockValueSerde = mock(Serde.class);
+ OutputStream<TestMessageEnvelope> outputStream =
+ graph.getOutputStream("test-stream-1", mockValueSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test(expected = NullPointerException.class)
+ public void testGetOutputStreamWithNullSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ graph.getOutputStream("test-stream-1", null);
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ Serde mockValueSerde = mock(Serde.class);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.setDefaultSerde(mockValueSerde);
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
+ }
+
+ @Test
+ public void testGetOutputStreamWithDefaultKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
+ assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
@Test
- public void testGetOutputStream() {
+ public void testGetOutputStreamWithDefaultDefaultSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+
+ OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
+
+ OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
+ assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
+ assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
+ assertTrue(outputStreamImpl.getValueSerde() instanceof NoOpSerde);
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingStreams() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getInputStream("test-stream-1");
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingOutputStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
- Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
+ graph.getOutputStream("test-stream-1");
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
+ }
- OutputStream<String, String, TestMessageEnvelope> outputStream =
- graph.getOutputStream("test-stream-1", mockKeyExtractor, mockMsgExtractor);
+ @Test(expected = IllegalStateException.class)
+ public void testSetDefaultSerdeAfterGettingIntermediateStream() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
- OutputStreamImpl<String, String, TestMessageEnvelope> outputOpSpec = (OutputStreamImpl) outputStream;
- assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputOpSpec);
- assertEquals(mockKeyExtractor, outputOpSpec.getKeyExtractor());
- assertEquals(mockMsgExtractor, outputOpSpec.getMsgExtractor());
- assertEquals(mockStreamSpec, outputOpSpec.getStreamSpec());
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ graph.getIntermediateStream("test-stream-1", null);
+ graph.setDefaultSerde(mock(Serde.class)); // should throw exception
}
@Test(expected = IllegalStateException.class)
@@ -136,12 +354,89 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class));
- graph.getOutputStream("test-stream-1", mock(Function.class), mock(Function.class)); // should throw exception
+ graph.getOutputStream("test-stream-1");
+ graph.getOutputStream("test-stream-1"); // should throw exception
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockValueSerde);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithKeyValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", mockKVSerde);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultValueSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+
+ Serde mockValueSerde = mock(Serde.class);
+ graph.setDefaultSerde(mockValueSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
@Test
- public void testGetIntermediateStream() {
+ public void testGetIntermediateStreamWithDefaultKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
@@ -150,19 +445,45 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
- Function<TestMessageEnvelope, String> mockKeyExtractor = mock(Function.class);
- Function<TestMessageEnvelope, String> mockMsgExtractor = mock(Function.class);
- BiFunction<String, String, TestMessageEnvelope> mockMsgBuilder = mock(BiFunction.class);
- IntermediateMessageStreamImpl<?, ?, TestMessageEnvelope> intermediateStreamImpl =
- graph.getIntermediateStream("test-stream-1", mockKeyExtractor, mockMsgExtractor, mockMsgBuilder);
+ KVSerde mockKVSerde = mock(KVSerde.class);
+ Serde mockKeySerde = mock(Serde.class);
+ Serde mockValueSerde = mock(Serde.class);
+ doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
+ doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
+ graph.setDefaultSerde(mockKVSerde);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
+
+ assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
+ assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
+ assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
+ assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
+ assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
+ assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
+ assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
+ }
+
+ @Test
+ public void testGetIntermediateStreamWithDefaultDefaultSerde() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ Config mockConfig = mock(Config.class);
+ StreamSpec mockStreamSpec = mock(StreamSpec.class);
+ when(mockConfig.get(JobConfig.JOB_NAME())).thenReturn("myJob");
+ when(mockConfig.get(JobConfig.JOB_ID(), "1")).thenReturn("i001");
+ when(mockRunner.getStreamSpec("myJob-i001-test-stream-1")).thenReturn(mockStreamSpec);
+
+ StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
+ IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
+ graph.getIntermediateStream("test-stream-1", null);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
- assertEquals(mockKeyExtractor, intermediateStreamImpl.getOutputStream().getKeyExtractor());
- assertEquals(mockMsgExtractor, intermediateStreamImpl.getOutputStream().getMsgExtractor());
- assertEquals(mockMsgBuilder, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getMsgBuilder());
+ assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
+ assertTrue(intermediateStreamImpl.getOutputStream().getValueSerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
+ assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde() instanceof NoOpSerde);
}
@Test(expected = IllegalStateException.class)
@@ -171,8 +492,8 @@ public class TestStreamGraphImpl {
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mock(StreamSpec.class));
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
- graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
- graph.getIntermediateStream("test-stream-1", mock(Function.class), mock(Function.class), mock(BiFunction.class));
+ graph.getIntermediateStream("test-stream-1", mock(Serde.class));
+ graph.getIntermediateStream("test-stream-1", mock(Serde.class));
}
@Test
@@ -199,9 +520,9 @@ public class TestStreamGraphImpl {
StreamSpec testStreamSpec3 = new StreamSpec("test-stream-3", "physical-stream-3", "test-system");
when(mockRunner.getStreamSpec("test-stream-3")).thenReturn(testStreamSpec3);
- graph.getInputStream("test-stream-1", (k, v) -> v);
- graph.getInputStream("test-stream-2", (k, v) -> v);
- graph.getInputStream("test-stream-3", (k, v) -> v);
+ graph.getInputStream("test-stream-1");
+ graph.getInputStream("test-stream-2");
+ graph.getInputStream("test-stream-3");
List<InputOperatorSpec> inputSpecs = new ArrayList<>(graph.getInputOperators().values());
Assert.assertEquals(inputSpecs.size(), 3);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
index ca8a151..ee44cf9 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/TestWindowOperator.java
@@ -34,6 +34,8 @@ import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.StreamSpec;
@@ -389,8 +391,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
.map(m -> m)
@@ -418,8 +421,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
.map(m -> m)
@@ -444,8 +448,9 @@ public class TestWindowOperator {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<IntegerEnvelope> inStream = graph.getInputStream("integers",
- (k, m) -> new IntegerEnvelope((Integer) k));
+ MessageStream<IntegerEnvelope> inStream =
+ graph.getInputStream("integers", KVSerde.of(new IntegerSerde(), new IntegerSerde()))
+ .map(kv -> new IntegerEnvelope(kv.getKey()));
Function<IntegerEnvelope, Integer> keyFn = m -> (Integer) m.getKey();
inStream
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
index 4505eef..68b4ce0 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestOperatorImplGraph.java
@@ -19,9 +19,9 @@
package org.apache.samza.operators.impl;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistryMap;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
@@ -30,6 +30,10 @@ import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.MapFunction;
import org.apache.samza.operators.spec.OperatorSpec.OpCode;
import org.apache.samza.runtime.ApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.NoOpSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.system.StreamSpec;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
@@ -43,8 +47,6 @@ import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
-import java.util.function.BiFunction;
-import java.util.function.Function;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame;
@@ -73,9 +75,8 @@ public class TestOperatorImplGraph {
when(mockRunner.getStreamSpec(eq("output"))).thenReturn(mock(StreamSpec.class));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
- OutputStream<Object, Object, Object> outputStream =
- streamGraph.getOutputStream("output", mock(Function.class), mock(Function.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ OutputStream<Object> outputStream = streamGraph.getOutputStream("output");
inputStream
.filter(mock(FilterFunction.class))
@@ -104,12 +105,49 @@ public class TestOperatorImplGraph {
}
@Test
+ public void testPartitionByChain() {
+ ApplicationRunner mockRunner = mock(ApplicationRunner.class);
+ when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
+ when(mockRunner.getStreamSpec(eq("output"))).thenReturn(new StreamSpec("output", "output-stream", "output-system"));
+ when(mockRunner.getStreamSpec(eq("null-null-partition_by-1")))
+ .thenReturn(new StreamSpec("intermediate", "intermediate-stream", "intermediate-system"));
+ StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
+ OutputStream<KV<Integer, String>> outputStream = streamGraph
+ .getOutputStream("output", KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)));
+
+ inputStream
+ .partitionBy(Object::hashCode, Object::toString, KVSerde.of(mock(IntegerSerde.class), mock(StringSerde.class)))
+ .sendTo(outputStream);
+
+ TaskContext mockTaskContext = mock(TaskContext.class);
+ when(mockTaskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
+ OperatorImplGraph opImplGraph =
+ new OperatorImplGraph(streamGraph, mock(Config.class), mockTaskContext, mock(Clock.class));
+
+ InputOperatorImpl inputOpImpl = opImplGraph.getInputOperator(new SystemStream("input-system", "input-stream"));
+ assertEquals(1, inputOpImpl.registeredOperators.size());
+
+ OperatorImpl partitionByOpImpl = (PartitionByOperatorImpl) inputOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, partitionByOpImpl.registeredOperators.size()); // is terminal but paired with an input operator
+ assertEquals(OpCode.PARTITION_BY, partitionByOpImpl.getOperatorSpec().getOpCode());
+
+ InputOperatorImpl repartitionedInputOpImpl =
+ opImplGraph.getInputOperator(new SystemStream("intermediate-system", "intermediate-stream"));
+ assertEquals(1, repartitionedInputOpImpl.registeredOperators.size());
+
+ OperatorImpl sendToOpImpl = (OutputOperatorImpl) repartitionedInputOpImpl.registeredOperators.iterator().next();
+ assertEquals(0, sendToOpImpl.registeredOperators.size());
+ assertEquals(OpCode.SEND_TO, sendToOpImpl.getOperatorSpec().getOpCode());
+ }
+
+ @Test
public void testBroadcastChain() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
inputStream.filter(mock(FilterFunction.class));
inputStream.map(mock(MapFunction.class));
@@ -132,7 +170,7 @@ public class TestOperatorImplGraph {
when(mockRunner.getStreamSpec(eq("input"))).thenReturn(new StreamSpec("input", "input-stream", "input-system"));
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
- MessageStream<Object> inputStream = streamGraph.getInputStream("input", mock(BiFunction.class));
+ MessageStream<Object> inputStream = streamGraph.getInputStream("input");
MessageStream<Object> stream1 = inputStream.filter(mock(FilterFunction.class));
MessageStream<Object> stream2 = inputStream.map(mock(MapFunction.class));
MessageStream<Object> mergedStream = stream1.merge(Collections.singleton(stream2));
@@ -156,8 +194,8 @@ public class TestOperatorImplGraph {
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mock(Config.class));
JoinFunction mockJoinFunction = mock(JoinFunction.class);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", new NoOpSerde<>());
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", new NoOpSerde<>());
inputStream1.join(inputStream2, mockJoinFunction, Duration.ofHours(1));
TaskContext mockTaskContext = mock(TaskContext.class);
@@ -182,13 +220,13 @@ public class TestOperatorImplGraph {
// verify that left partial join operator calls getFirstKey
Object mockLeftMessage = mock(Object.class);
when(mockJoinFunction.getFirstKey(eq(mockLeftMessage))).thenReturn(joinKey);
- inputOpImpl1.onMessage(Pair.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ inputOpImpl1.onMessage(KV.of("", mockLeftMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getFirstKey(mockLeftMessage);
// verify that right partial join operator calls getSecondKey
Object mockRightMessage = mock(Object.class);
when(mockJoinFunction.getSecondKey(eq(mockRightMessage))).thenReturn(joinKey);
- inputOpImpl2.onMessage(Pair.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
+ inputOpImpl2.onMessage(KV.of("", mockRightMessage), mock(MessageCollector.class), mock(TaskCoordinator.class));
verify(mockJoinFunction, times(1)).getSecondKey(mockRightMessage);
// verify that the join function apply is called with the correct messages on match
@@ -205,8 +243,8 @@ public class TestOperatorImplGraph {
when(mockContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
StreamGraphImpl streamGraph = new StreamGraphImpl(mockRunner, mockConfig);
- MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1", (k, v) -> v);
- MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2", (k, v) -> v);
+ MessageStream<Object> inputStream1 = streamGraph.getInputStream("input1");
+ MessageStream<Object> inputStream2 = streamGraph.getInputStream("input2");
List<String> initializedOperators = new ArrayList<>();
List<String> closedOperators = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
index e183d87..a91c1af 100644
--- a/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
+++ b/samza-core/src/test/java/org/apache/samza/operators/impl/TestStreamOperatorImpl.java
@@ -41,7 +41,7 @@ public class TestStreamOperatorImpl {
@Test
@SuppressWarnings("unchecked")
- public void testSimpleOperator() {
+ public void testStreamOperator() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
@@ -61,7 +61,7 @@ public class TestStreamOperatorImpl {
}
@Test
- public void testSimpleOperatorClose() {
+ public void testStreamOperatorClose() {
StreamOperatorSpec<TestMessageEnvelope, TestOutputMessageEnvelope> mockOp = mock(StreamOperatorSpec.class);
FlatMapFunction<TestMessageEnvelope, TestOutputMessageEnvelope> txfmFn = mock(FlatMapFunction.class);
when(mockOp.getTransformFn()).thenReturn(txfmFn);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
deleted file mode 100644
index eddfb0a..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteBufferSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-import org.junit.Assert._
-import org.junit.Test
-import java.nio.ByteBuffer
-
-class TestByteBufferSerde {
- @Test
- def testSerde {
- val serde = new ByteBufferSerde
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val bytes = "A lazy way of creating a byte array".getBytes()
- val byteBuffer = ByteBuffer.wrap(bytes)
- byteBuffer.mark()
- assertArrayEquals(serde.toBytes(byteBuffer), bytes)
- byteBuffer.reset()
- assertEquals(serde.fromBytes(bytes), byteBuffer)
- }
-
- @Test
- def testSerializationPreservesInput {
- val serde = new ByteBufferSerde
- val bytes = "A lazy way of creating a byte array".getBytes()
- val byteBuffer = ByteBuffer.wrap(bytes)
- byteBuffer.get() // advance position by 1
- serde.toBytes(byteBuffer)
-
- assertEquals(byteBuffer.capacity(), byteBuffer.limit())
- assertEquals(1, byteBuffer.position())
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
deleted file mode 100644
index f605762..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestByteSerde.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestByteSerde {
- @Test
- def testByteSerde {
- val serde = new ByteSerde
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val testBytes = "A lazy way of creating a byte array".getBytes()
- assertArrayEquals(serde.toBytes(testBytes), testBytes)
- assertArrayEquals(serde.fromBytes(testBytes), testBytes)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
deleted file mode 100644
index 60241b5..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestDoubleSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestDoubleSerde {
- @Test
- def testDoubleSerde {
- val serde = new DoubleSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 9.156013e-002
- val fooBarBytes = serde.toBytes(fooBar)
- fooBarBytes.foreach(System.err.println)
- assertArrayEquals(Array[Byte](63, -73, 112, 124, 19, -9, -82, -93), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
deleted file mode 100644
index a3e7e1f..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestIntegerSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestIntegerSerde {
- @Test
- def testIntegerSerde {
- val serde = new IntegerSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 37
- val fooBarBytes = serde.toBytes(fooBar)
- assertArrayEquals(Array[Byte](0, 0, 0, 37), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
deleted file mode 100644
index 77a7498..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestLongSerde.scala
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.util.Arrays
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestLongSerde {
- @Test
- def testLongSerde {
- val serde = new LongSerde
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = 1234123412341234L
- val fooBarBytes = serde.toBytes(fooBar)
- fooBarBytes.foreach(System.err.println)
- assertArrayEquals(Array[Byte](0, 4, 98, 109, -65, -102, 1, -14), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
deleted file mode 100644
index 8e899d0..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestSerializableSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestSerializableSerde {
- @Test
- def testSerializableSerde {
- val serde = new SerializableSerde[String]
- assertNull(serde.toBytes(null))
- assertNull(serde.fromBytes(null))
-
- val obj = "String is serializable"
-
- // Serialized string is prefix + string itself
- val prefix = Array(0xAC, 0xED, 0x00, 0x05, 0x74, 0x00, 0x16).map(_.toByte)
- val expected = (prefix ++ obj.getBytes("UTF-8"))
-
- val bytes = serde.toBytes(obj)
-
- assertArrayEquals(expected, bytes)
-
- val objRoundTrip:String = serde.fromBytes(bytes)
- assertEquals(obj, objRoundTrip)
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
deleted file mode 100644
index a1e8e88..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestStringSerde.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestStringSerde {
- @Test
- def testStringSerde {
- val serde = new StringSerde("UTF-8")
- assertEquals(null, serde.toBytes(null))
- assertEquals(null, serde.fromBytes(null))
-
- val fooBar = "foo bar"
- val fooBarBytes = serde.toBytes(fooBar)
- assertArrayEquals(fooBar.getBytes("UTF-8"), fooBarBytes)
- assertEquals(fooBar, serde.fromBytes(fooBarBytes))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala b/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
deleted file mode 100644
index 04ddcdb..0000000
--- a/samza-core/src/test/scala/org/apache/samza/serializers/TestUUIDSerde.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.BufferUnderflowException
-import java.util.UUID
-
-import org.junit.Assert._
-import org.junit.Test
-
-class TestUUIDSerde {
- private val serde = new UUIDSerde
-
- @Test
- def testUUIDSerde {
- val uuid = new UUID(13, 42)
- val bytes = serde.toBytes(uuid)
- assertArrayEquals(Array[Byte](0, 0, 0, 0, 0, 0, 0, 13, 0, 0, 0, 0, 0, 0, 0, 42), bytes)
- assertEquals(uuid, serde.fromBytes(bytes))
- }
-
- @Test
- def testToBytesWhenNull {
- assertEquals(null, serde.toBytes(null))
- }
-
- @Test
- def testFromBytesWhenNull {
- assertEquals(null, serde.fromBytes(null))
- }
-
- @Test(expected = classOf[BufferUnderflowException])
- def testFromBytesWhenInvalid {
- assertEquals(null, serde.fromBytes(Array[Byte](0)))
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
index 0f0d792..5824489 100644
--- a/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
+++ b/samza-log4j/src/main/java/org/apache/samza/config/Log4jSystemConfig.java
@@ -77,7 +77,7 @@ public class Log4jSystemConfig extends JavaSystemConfig {
* supplied serde name.
*/
public String getSerdeClass(String name) {
- return get(String.format(SerializerConfig.SERDE(), name), null);
+ return get(String.format(SerializerConfig.SERDE_FACTORY_CLASS(), name), null);
}
public String getStreamSerdeName(String systemName, String streamName) {
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
----------------------------------------------------------------------
diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
index e442599..8436835 100644
--- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
+++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/StreamAppender.java
@@ -246,7 +246,7 @@ public class StreamAppender extends AppenderSkeleton {
SerdeFactory<LoggingEvent> serdeFactory = Util.<SerdeFactory<LoggingEvent>>getObj(serdeClass);
serde = serdeFactory.getSerde(systemName, config);
} else {
- String serdeKey = String.format(SerializerConfig.SERDE(), serdeName);
+ String serdeKey = String.format(SerializerConfig.SERDE_FACTORY_CLASS(), serdeName);
throw new SamzaException("Can not find serializers class for key '" + serdeName + "'. Please specify " +
serdeKey + " property");
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-test/src/main/resources/log4j.xml
----------------------------------------------------------------------
diff --git a/samza-test/src/main/resources/log4j.xml b/samza-test/src/main/resources/log4j.xml
index 7b4fb82..ab74ebd 100644
--- a/samza-test/src/main/resources/log4j.xml
+++ b/samza-test/src/main/resources/log4j.xml
@@ -12,43 +12,39 @@
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
- <appender name="RollingAppender" class="org.apache.log4j.RollingFileAppender">
- <param name="File" value="${samza.log.dir}/${samza.container.name}.log" />
- <param name="MaxFileSize" value="256MB" />
- <param name="MaxBackupIndex" value="20" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
- </layout>
- </appender>
-
- <appender name="StartupAppender" class="org.apache.log4j.RollingFileAppender">
- <param name="File" value="${samza.log.dir}/${samza.container.name}-startup.log" />
- <param name="MaxFileSize" value="256MB" />
- <param name="MaxBackupIndex" value="1" />
- <layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
- </layout>
- </appender>
-
<appender name="console" class="org.apache.log4j.ConsoleAppender">
<param name="Target" value="System.out" />
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss.SSS} [%t] %c{1} [%p] %m%n" />
+ <param name="ConversionPattern" value="[%t] %c{1} [%p] %m%n" />
</layout>
</appender>
<root>
<priority value="info" />
- <appender-ref ref="console" />
- <appender-ref ref="RollingAppender" />
+ <appender-ref ref="console" />
</root>
<logger name="STARTUP_LOGGER" additivity="false">
<level value="info" />
- <appender-ref ref="StartupAppender"/>
+ <appender-ref ref="console"/>
</logger>
<logger name="org.apache.hadoop">
- <level value="off"/>
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.I0Itec.zkclient">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.zookeeper">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.samza.system.kafka">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="org.apache.kafka">
+ <level value="ERROR"/>
+ </logger>
+ <logger name="kafka">
+ <level value="ERROR"/>
</logger>
-</log4j:configuration>
+</log4j:configuration>
\ No newline at end of file
[3/4] samza git commit: SAMZA-1109;
Allow setting Serdes for fluent API operators in code
Posted by ja...@apache.org.
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
index 0545af1..9cc5370 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/InputOperatorImpl.java
@@ -18,8 +18,8 @@
*/
package org.apache.samza.operators.impl;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.task.MessageCollector;
@@ -35,13 +35,12 @@ import java.util.Collections;
*
* @param <K> the type of key in the incoming message
* @param <V> the type of message in the incoming message
- * @param <M> the type of input message
*/
-public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M> {
+public final class InputOperatorImpl<K, V> extends OperatorImpl<KV<K, V>, Object> { // Object == KV<K,V> | V
- private final InputOperatorSpec<K, V, M> inputOpSpec;
+ private final InputOperatorSpec<K, V> inputOpSpec;
- InputOperatorImpl(InputOperatorSpec<K, V, M> inputOpSpec) {
+ InputOperatorImpl(InputOperatorSpec<K, V> inputOpSpec) {
this.inputOpSpec = inputOpSpec;
}
@@ -50,9 +49,8 @@ public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M
}
@Override
- public Collection<M> handleMessage(Pair<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
- // TODO: SAMZA-1148 - Cast to appropriate input (key, msg) types based on the serde before applying the msgBuilder.
- M message = this.inputOpSpec.getMsgBuilder().apply(pair.getKey(), pair.getValue());
+ public Collection<Object> handleMessage(KV<K, V> pair, MessageCollector collector, TaskCoordinator coordinator) {
+ Object message = this.inputOpSpec.isKeyedInput() ? pair : pair.getValue();
return Collections.singletonList(message);
}
@@ -60,7 +58,7 @@ public final class InputOperatorImpl<K, V, M> extends OperatorImpl<Pair<K, V>, M
protected void handleClose() {
}
- protected OperatorSpec<Pair<K, V>, M> getOperatorSpec() {
+ protected OperatorSpec<KV<K, V>, Object> getOperatorSpec() {
return this.inputOpSpec;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
index 8dd5acd..e353ac4 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
@@ -18,6 +18,7 @@
*/
package org.apache.samza.operators.impl;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.config.MetricsConfig;
import org.apache.samza.metrics.Counter;
@@ -37,6 +38,9 @@ import java.util.Set;
/**
* Abstract base class for all stream operator implementations.
+ *
+ * @param <M> type of the input to this operator
+ * @param <RM> type of the results of applying this operator
*/
public abstract class OperatorImpl<M, RM> {
private static final String METRICS_GROUP = OperatorImpl.class.getName();
@@ -113,7 +117,19 @@ public abstract class OperatorImpl<M, RM> {
public final void onMessage(M message, MessageCollector collector, TaskCoordinator coordinator) {
this.numMessage.inc();
long startNs = this.highResClock.nanoTime();
- Collection<RM> results = handleMessage(message, collector, coordinator);
+ Collection<RM> results;
+ try {
+ results = handleMessage(message, collector, coordinator);
+ } catch (ClassCastException e) {
+ String actualType = e.getMessage().replaceFirst(" cannot be cast to .*", "");
+ String expectedType = e.getMessage().replaceFirst(".* cannot be cast to ", "");
+ throw new SamzaException(
+ String.format("Error applying operator %s (created at %s) to its input message. "
+ + "Expected input message to be of type %s, but found it to be of type %s. "
+ + "Are Serdes for the inputs to this operator configured correctly?",
+ getOperatorName(), getOperatorSpec().getSourceLocation(), expectedType, actualType), e);
+ }
+
long endNs = this.highResClock.nanoTime();
this.handleMessageNs.update(endNs - startNs);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
index 99496eb..faedfc9 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java
@@ -19,8 +19,8 @@
package org.apache.samza.operators.impl;
import com.google.common.collect.Lists;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.operators.functions.PartialJoinFunction;
@@ -28,8 +28,9 @@ import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputOperatorSpec;
-import org.apache.samza.operators.spec.StreamOperatorSpec;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
import org.apache.samza.operators.spec.SinkOperatorSpec;
+import org.apache.samza.operators.spec.StreamOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.InternalInMemoryStore;
import org.apache.samza.storage.kv.KeyValueStore;
@@ -69,7 +70,7 @@ public class OperatorImplGraph {
* the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're
* reached from different {@link OperatorSpec} during DAG traversals.
*/
- private final Map<Integer, Pair<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
+ private final Map<Integer, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>();
private final Clock clock;
@@ -167,6 +168,8 @@ public class OperatorImplGraph {
return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context);
} else if (operatorSpec instanceof OutputOperatorSpec) {
return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, context);
+ } else if (operatorSpec instanceof PartitionByOperatorSpec) {
+ return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, config, context);
} else if (operatorSpec instanceof WindowOperatorSpec) {
return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock);
} else if (operatorSpec instanceof JoinOperatorSpec) {
@@ -178,19 +181,19 @@ public class OperatorImplGraph {
private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec,
JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) {
- Pair<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
+ KV<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec);
if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join
return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true,
- partialJoinFunctions.getLeft(), partialJoinFunctions.getRight(), config, context, clock);
+ partialJoinFunctions.getKey(), partialJoinFunctions.getValue(), config, context, clock);
} else { // we got here from the right side of the join
return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false,
- partialJoinFunctions.getRight(), partialJoinFunctions.getLeft(), config, context, clock);
+ partialJoinFunctions.getValue(), partialJoinFunctions.getKey(), config, context, clock);
}
}
- private Pair<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
+ private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) {
return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(),
- joinOpId -> Pair.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
+ joinOpId -> KV.of(createLeftJoinFn(joinOpSpec.getJoinFn()), createRightJoinFn(joinOpSpec.getJoinFn())));
}
private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinFunction joinFn) {
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
index fe59b74..7b7e49c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java
@@ -19,6 +19,7 @@
package org.apache.samza.operators.impl;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputOperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
@@ -38,11 +39,14 @@ import java.util.Collections;
class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
private final OutputOperatorSpec<M> outputOpSpec;
- private final OutputStreamImpl<?, ?, M> outputStream;
+ private final OutputStreamImpl<M> outputStream;
+ private final SystemStream systemStream;
OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) {
this.outputOpSpec = outputOpSpec;
this.outputStream = outputOpSpec.getOutputStream();
+ this.systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
+ outputStream.getStreamSpec().getPhysicalName());
}
@Override
@@ -52,12 +56,16 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> {
@Override
public Collection<Void> handleMessage(M message, MessageCollector collector,
TaskCoordinator coordinator) {
- // TODO: SAMZA-1148 - need to find a way to directly pass in the serde class names
- SystemStream systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(),
- outputStream.getStreamSpec().getPhysicalName());
- Object key = outputStream.getKeyExtractor().apply(message);
- Object msg = outputStream.getMsgExtractor().apply(message);
- collector.send(new OutgoingMessageEnvelope(systemStream, key, msg));
+ Object key, value;
+ if (outputStream.isKeyedOutput()) {
+ key = ((KV) message).getKey();
+ value = ((KV) message).getValue();
+ } else {
+ key = null;
+ value = message;
+ }
+
+ collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
new file mode 100644
index 0000000..072b31d
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.impl;
+
+import org.apache.samza.SamzaException;
+import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
+import org.apache.samza.operators.spec.OperatorSpec;
+import org.apache.samza.operators.spec.OutputStreamImpl;
+import org.apache.samza.operators.spec.PartitionByOperatorSpec;
+import org.apache.samza.system.OutgoingMessageEnvelope;
+import org.apache.samza.system.SystemStream;
+import org.apache.samza.task.MessageCollector;
+import org.apache.samza.task.TaskContext;
+import org.apache.samza.task.TaskCoordinator;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+
+/**
+ * An operator that sends sends messages to an output {@link SystemStream} for repartitioning them.
+ */
+class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> {
+
+ private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec;
+ private final SystemStream systemStream;
+ private final Function<? super M, ? extends K> keyFunction;
+ private final Function<? super M, ? extends V> valueFunction;
+
+ PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) {
+ this.partitionByOpSpec = partitionByOpSpec;
+ OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream();
+ if (!outputStream.isKeyedOutput()) {
+ throw new SamzaException("Output stream for repartitioning must be a keyed stream.");
+ }
+ this.systemStream = new SystemStream(
+ outputStream.getStreamSpec().getSystemName(),
+ outputStream.getStreamSpec().getPhysicalName());
+ this.keyFunction = partitionByOpSpec.getKeyFunction();
+ this.valueFunction = partitionByOpSpec.getValueFunction();
+ }
+
+ @Override
+ protected void handleInit(Config config, TaskContext context) {
+ }
+
+ @Override
+ public Collection<Void> handleMessage(M message, MessageCollector collector,
+ TaskCoordinator coordinator) {
+ K key = keyFunction.apply(message);
+ V value = valueFunction.apply(message);
+ collector.send(new OutgoingMessageEnvelope(systemStream, null, key, value));
+ return Collections.emptyList();
+ }
+
+ @Override
+ protected void handleClose() {
+ }
+
+ @Override
+ protected OperatorSpec<M, Void> getOperatorSpec() {
+ return partitionByOpSpec;
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
index f9485f7..736d71e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java
@@ -137,6 +137,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
@Override
public Collection<WindowPane<WK, WV>> handleTimer(MessageCollector collector, TaskCoordinator coordinator) {
+ LOG.trace("Processing timer.");
List<WindowPane<WK, WV>> results = new ArrayList<>();
List<TriggerKey<WK>> keys = triggerScheduler.runPendingCallbacks();
@@ -148,7 +149,7 @@ public class WindowOperatorImpl<M, WK, WV> extends OperatorImpl<M, WindowPane<WK
maybeTriggeredPane.ifPresent(results::add);
}
}
-
+ LOG.trace("Triggered panes: " + results.size());
return results;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
index 6fbc3c1..2749245 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java
@@ -18,35 +18,46 @@
*/
package org.apache.samza.operators.spec;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.samza.operators.KV;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
-import java.util.function.BiFunction;
-
/**
* The spec for an operator that receives incoming messages from an input stream
* and converts them to the input message.
*
- * @param <K> the type of key in the incoming message
- * @param <V> the type of message in the incoming message
- * @param <M> the type of input message
+ * @param <K> the type of input key
+ * @param <V> the type of input value
*/
-public class InputOperatorSpec<K, V, M> extends OperatorSpec<Pair<K, V>, M> {
+public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V
private final StreamSpec streamSpec;
- private final BiFunction<K, V, M> msgBuilder;
+ private final Serde<K> keySerde;
+ private final Serde<V> valueSerde;
+ private final boolean isKeyedInput;
- public InputOperatorSpec(StreamSpec streamSpec, BiFunction<K, V, M> msgBuilder, int opId) {
+ public InputOperatorSpec(StreamSpec streamSpec,
+ Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyedInput, int opId) {
super(OpCode.INPUT, opId);
this.streamSpec = streamSpec;
- this.msgBuilder = msgBuilder;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.isKeyedInput = isKeyedInput;
}
public StreamSpec getStreamSpec() {
return this.streamSpec;
}
- public BiFunction<K, V, M> getMsgBuilder() {
- return this.msgBuilder;
+ public Serde<K> getKeySerde() {
+ return keySerde;
+ }
+
+ public Serde<V> getValueSerde() {
+ return valueSerde;
+ }
+
+ public boolean isKeyedInput() {
+ return isKeyedInput;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
index f64e123..bcb0485 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java
@@ -19,6 +19,8 @@
package org.apache.samza.operators.spec;
import org.apache.samza.annotation.InterfaceStability;
+import org.apache.samza.operators.MessageStream;
+import org.apache.samza.operators.MessageStreamImpl;
import java.util.Collection;
import java.util.LinkedHashSet;
@@ -105,9 +107,23 @@ public abstract class OperatorSpec<M, OM> {
// [2] SomeOperatorSpec.<init>()
// [3] OperatorSpecs.createSomeOperatorSpec()
// [4] MessageStreamImpl.someOperator()
- // [5] User code that calls [4]
- // we are interested in [5] here
+ // [5] User/MessageStreamImpl code that calls [4]
+ // We are interested in the first call below this that originates from user code
StackTraceElement element = this.creationStackTrace[5];
+
+ /**
+ * Sometimes [5] above is a call from MessageStream/MessageStreamImpl itself (e.g. for
+ * {@link org.apache.samza.operators.MessageStream#mergeAll(Collection)} or
+ * {@link MessageStreamImpl#partitionBy(Function, Function)}).
+ * If that's the case, find the first call from a class other than these.
+ */
+ for (int i = 5; i < creationStackTrace.length; i++) {
+ if (!creationStackTrace[i].getClassName().equals(MessageStreamImpl.class.getName())
+ && !creationStackTrace[i].getClassName().equals(MessageStream.class.getName())) {
+ element = creationStackTrace[i];
+ break;
+ }
+ }
return String.format("%s:%s", element.getFileName(), element.getLineNumber());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
index ed5fc8f..e67179e 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java
@@ -20,6 +20,7 @@
package org.apache.samza.operators.spec;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.functions.FilterFunction;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.operators.functions.JoinFunction;
@@ -30,6 +31,7 @@ import org.apache.samza.task.TaskContext;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.function.Function;
/**
@@ -140,29 +142,29 @@ public class OperatorSpecs {
*
* @param outputStream the {@link OutputStreamImpl} to send messages to
* @param opId the unique ID of the operator
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
* @param <M> the type of message in the {@link OutputStreamImpl}
* @return the {@link OutputOperatorSpec} for the sendTo operator
*/
- public static <K, V, M> OutputOperatorSpec<M> createSendToOperatorSpec(
- OutputStreamImpl<K, V, M> outputStream, int opId) {
- return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.SEND_TO, opId);
+ public static <M> OutputOperatorSpec<M> createSendToOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+ return new OutputOperatorSpec<>(outputStream, opId);
}
/**
- * Creates a {@link OutputOperatorSpec} for the partitionBy operator.
+ * Creates a {@link PartitionByOperatorSpec} for the partitionBy operator.
*
+ * @param <M> the type of messages being repartitioned
+ * @param <K> the type of key in the repartitioned {@link OutputStreamImpl}
+ * @param <V> the type of value in the repartitioned {@link OutputStreamImpl}
* @param outputStream the {@link OutputStreamImpl} to send messages to
+ * @param keyFunction the {@link MapFunction} for extracting the key from the message
+ * @param valueFunction the {@link MapFunction} for extracting the value from the message
* @param opId the unique ID of the operator
- * @param <K> the type of key in the outgoing message
- * @param <V> the type of message in the outgoing message
- * @param <M> the type of message in the {@link OutputStreamImpl}
* @return the {@link OutputOperatorSpec} for the partitionBy operator
*/
- public static <K, V, M> OutputOperatorSpec<M> createPartitionByOperatorSpec(
- OutputStreamImpl<K, V, M> outputStream, int opId) {
- return new OutputOperatorSpec<>(outputStream, OperatorSpec.OpCode.PARTITION_BY, opId);
+ public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec(
+ OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction,
+ Function<? super M, ? extends V> valueFunction, int opId) {
+ return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId);
}
/**
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
index e6767ec..fc88634 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputOperatorSpec.java
@@ -29,19 +29,16 @@ package org.apache.samza.operators.spec;
*/
public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
- private OutputStreamImpl<?, ?, M> outputStream;
-
+ private final OutputStreamImpl<M> outputStream;
/**
* Constructs an {@link OutputOperatorSpec} to send messages to the provided {@code outStream}
*
* @param outputStream the {@link OutputStreamImpl} to send messages to
- * @param opCode the specific {@link OpCode} for this {@link SinkOperatorSpec}.
- * It could be {@link OpCode#SEND_TO}, or {@link OpCode#PARTITION_BY}
* @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
*/
- OutputOperatorSpec(OutputStreamImpl<?, ?, M> outputStream, OperatorSpec.OpCode opCode, int opId) {
- super(opCode, opId);
+ OutputOperatorSpec(OutputStreamImpl<M> outputStream, int opId) {
+ super(OpCode.SEND_TO, opId);
this.outputStream = outputStream;
}
@@ -49,7 +46,7 @@ public class OutputOperatorSpec<M> extends OperatorSpec<M, Void> {
* The {@link OutputStreamImpl} that this operator is sending its output to.
* @return the {@link OutputStreamImpl} for this operator if any, else null.
*/
- public OutputStreamImpl<?, ?, M> getOutputStream() {
+ public OutputStreamImpl<M> getOutputStream() {
return this.outputStream;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
index 5506378..a793e0c 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java
@@ -19,32 +19,38 @@
package org.apache.samza.operators.spec;
import org.apache.samza.operators.OutputStream;
+import org.apache.samza.serializers.Serde;
import org.apache.samza.system.StreamSpec;
-import java.util.function.Function;
-public class OutputStreamImpl<K, V, M> implements OutputStream<K, V, M> {
+public class OutputStreamImpl<M> implements OutputStream<M> {
private final StreamSpec streamSpec;
- private final Function<M, K> keyExtractor;
- private final Function<M, V> msgExtractor;
+ private final Serde keySerde;
+ private final Serde valueSerde;
+ private final boolean isKeyedOutput;
public OutputStreamImpl(StreamSpec streamSpec,
- Function<M, K> keyExtractor, Function<M, V> msgExtractor) {
+ Serde keySerde, Serde valueSerde, boolean isKeyedOutput) {
this.streamSpec = streamSpec;
- this.keyExtractor = keyExtractor;
- this.msgExtractor = msgExtractor;
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ this.isKeyedOutput = isKeyedOutput;
}
public StreamSpec getStreamSpec() {
return streamSpec;
}
- public Function<M, K> getKeyExtractor() {
- return keyExtractor;
+ public Serde getKeySerde() {
+ return keySerde;
}
- public Function<M, V> getMsgExtractor() {
- return msgExtractor;
+ public Serde getValueSerde() {
+ return valueSerde;
+ }
+
+ public boolean isKeyedOutput() {
+ return isKeyedOutput;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
new file mode 100644
index 0000000..a2bb5f2
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.operators.spec;
+
+import org.apache.samza.operators.KV;
+
+import java.util.function.Function;
+
+
+/**
+ * The spec for an operator that re-partitions a {@link org.apache.samza.operators.MessageStream} to a
+ * {@link org.apache.samza.system.SystemStream}. This is usually paired with a corresponding
+ * {@link InputOperatorSpec} that consumes the {@link org.apache.samza.system.SystemStream} again.
+ * <p>
+ * This is a terminal operator and does not allow further operator chaining.
+ *
+ * @param <M> the type of message
+ * @param <K> the type of key in the message
+ * @param <V> the type of value in the message
+ */
+public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> {
+
+ private final OutputStreamImpl<KV<K, V>> outputStream;
+ private final Function<? super M, ? extends K> keyFunction;
+ private final Function<? super M, ? extends V> valueFunction;
+
+ /**
+ * Constructs an {@link PartitionByOperatorSpec} to send messages to the provided {@code outputStream}
+ *
+ * @param outputStream the {@link OutputStreamImpl} to send messages to
+ * @param keyFunction the {@link Function} for extracting the key from the message
+ * @param valueFunction the {@link Function} for extracting the value from the message
+ * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph
+ */
+ PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream,
+ Function<? super M, ? extends K> keyFunction,
+ Function<? super M, ? extends V> valueFunction, int opId) {
+ super(OpCode.PARTITION_BY, opId);
+ this.outputStream = outputStream;
+ this.keyFunction = keyFunction;
+ this.valueFunction = valueFunction;
+ }
+
+ /**
+ * The {@link OutputStreamImpl} that this operator is sending its output to.
+ * @return the {@link OutputStreamImpl} for this operator if any, else null.
+ */
+ public OutputStreamImpl<KV<K, V>> getOutputStream() {
+ return this.outputStream;
+ }
+
+ public Function<? super M, ? extends K> getKeyFunction() {
+ return keyFunction;
+ }
+
+ public Function<? super M, ? extends V> getValueFunction() {
+ return valueFunction;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
index f0bb1dc..279cdd4 100644
--- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
+++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java
@@ -22,6 +22,7 @@ import org.apache.samza.operators.MessageStreamImpl;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.spec.InputOperatorSpec;
+import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.system.StreamSpec;
@@ -30,21 +31,19 @@ import org.apache.samza.system.StreamSpec;
* <p>
* This implementation accepts a pair of {@link InputOperatorSpec} and {@link OutputStreamImpl} associated
* with the same logical {@code streamId}. It provides access to its {@link OutputStreamImpl} for
- * {@link MessageStreamImpl#partitionBy} to send messages out to. It's also a {@link MessageStreamImpl} with
+ * the partitionBy operator to send messages out to. It's also a {@link MessageStreamImpl} with
* {@link InputOperatorSpec} as its operator spec, so that further operations can be chained on the
* {@link InputOperatorSpec}.
*
- * @param <K> the type of key in the outgoing/incoming message
- * @param <V> the type of message in the outgoing/incoming message
* @param <M> the type of message in the output {@link MessageStreamImpl}
*/
-public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M> implements OutputStream<K, V, M> {
+public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> implements OutputStream<M> {
- private final OutputStreamImpl<K, V, M> outputStream;
+ private final OutputStreamImpl<M> outputStream;
- public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<K, V, M> inputOperatorSpec,
- OutputStreamImpl<K, V, M> outputStream) {
- super(graph, inputOperatorSpec);
+ public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<?, M> inputOperatorSpec,
+ OutputStreamImpl<M> outputStream) {
+ super(graph, (OperatorSpec<?, M>) inputOperatorSpec);
this.outputStream = outputStream;
}
@@ -52,7 +51,7 @@ public class IntermediateMessageStreamImpl<K, V, M> extends MessageStreamImpl<M>
return this.outputStream.getStreamSpec();
}
- public OutputStreamImpl<K, V, M> getOutputStream() {
+ public OutputStreamImpl<M> getOutputStream() {
return this.outputStream;
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
index 26ef92c..45ce9aa 100644
--- a/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
+++ b/samza-core/src/main/java/org/apache/samza/serializers/IntermediateMessageSerde.java
@@ -20,11 +20,13 @@
package org.apache.samza.serializers;
import java.util.Arrays;
+
import org.apache.samza.SamzaException;
import org.apache.samza.message.EndOfStreamMessage;
import org.apache.samza.message.MessageType;
import org.apache.samza.message.WatermarkMessage;
-import org.codehaus.jackson.type.TypeReference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -49,28 +51,7 @@ import org.codehaus.jackson.type.TypeReference;
* For control message, we use json serde.
*/
public class IntermediateMessageSerde implements Serde<Object> {
-
- private static final class WatermarkSerde extends JsonSerde<WatermarkMessage> {
- @Override
- public WatermarkMessage fromBytes(byte[] bytes) {
- try {
- return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<WatermarkMessage>() { });
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
- }
-
- private static final class EndOfStreamSerde extends JsonSerde<EndOfStreamMessage> {
- @Override
- public EndOfStreamMessage fromBytes(byte[] bytes) {
- try {
- return mapper().readValue(new String(bytes, "UTF-8"), new TypeReference<EndOfStreamMessage>() { });
- } catch (Exception e) {
- throw new SamzaException(e);
- }
- }
- }
+ private static final Logger LOGGER = LoggerFactory.getLogger(IntermediateMessageSerde.class);
private final Serde userMessageSerde;
private final Serde<WatermarkMessage> watermarkSerde;
@@ -78,8 +59,8 @@ public class IntermediateMessageSerde implements Serde<Object> {
public IntermediateMessageSerde(Serde userMessageSerde) {
this.userMessageSerde = userMessageSerde;
- this.watermarkSerde = new WatermarkSerde();
- this.eosSerde = new EndOfStreamSerde();
+ this.watermarkSerde = new JsonSerdeV2<>(WatermarkMessage.class);
+ this.eosSerde = new JsonSerdeV2<>(EndOfStreamMessage.class);
}
@Override
@@ -110,7 +91,13 @@ public class IntermediateMessageSerde implements Serde<Object> {
// 1) the first byte is not a valid type so it will cause ArrayOutOfBound exception
// 2) the first byte happens to be a valid type, but the deserialization fails with certain exception
// For these cases, we fall back to user-provided serde
- return userMessageSerde.fromBytes(bytes);
+ try {
+ return userMessageSerde.fromBytes(bytes);
+ } catch (Exception umse) {
+ LOGGER.error("Error deserializing from both intermediate message serde and user message serde. "
+ + "Original exception: ", e);
+ throw umse;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
index a77ef3b..d7c2742 100644
--- a/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
+++ b/samza-core/src/main/java/org/apache/samza/task/StreamOperatorTask.java
@@ -18,10 +18,10 @@
*/
package org.apache.samza.task;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
import org.apache.samza.operators.ContextManager;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.impl.InputOperatorImpl;
import org.apache.samza.operators.impl.OperatorImplGraph;
@@ -105,7 +105,7 @@ public final class StreamOperatorTask implements StreamTask, InitableTask, Windo
SystemStream systemStream = ime.getSystemStreamPartition().getSystemStream();
InputOperatorImpl inputOpImpl = operatorImplGraph.getInputOperator(systemStream);
if (inputOpImpl != null) {
- inputOpImpl.onMessage(Pair.of(ime.getKey(), ime.getMessage()), collector, coordinator);
+ inputOpImpl.onMessage(KV.of(ime.getKey(), ime.getMessage()), collector, coordinator);
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
index 020e3bc..764f77a 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/SerializerConfig.scala
@@ -23,13 +23,15 @@ import scala.collection.JavaConverters._
object SerializerConfig {
// serializer config constants
val SERIALIZER_PREFIX = "serializers.registry.%s"
- val SERDE = "serializers.registry.%s.class"
+ val SERDE_FACTORY_CLASS = "serializers.registry.%s.class"
+ val SERIALIZED_INSTANCE_SUFFIX = ".samza.serialized.instance"
+ val SERDE_SERIALIZED_INSTANCE = SERIALIZER_PREFIX + SERIALIZED_INSTANCE_SUFFIX
implicit def Config2Serializer(config: Config) = new SerializerConfig(config)
}
class SerializerConfig(config: Config) extends ScalaMapConfig(config) {
- def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE format name)
+ def getSerdeClass(name: String) = getOption(SerializerConfig.SERDE_FACTORY_CLASS format name)
/**
* Returns a list of all serializer names from the config file. Useful for
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 628d7f6..45e8e10 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -24,9 +24,8 @@ import java.nio.file.Path
import java.util
import java.util.concurrent.{ExecutorService, Executors, TimeUnit}
import java.net.{URL, UnknownHostException}
+import java.util.Base64
-import org.apache.samza.serializers.IntermediateMessageSerde
-import org.apache.samza.serializers.StringSerde
import org.apache.samza.{SamzaContainerStatus, SamzaException}
import org.apache.samza.checkpoint.{CheckpointListener, CheckpointManagerFactory, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.config.JobConfig.Config2Job
@@ -50,8 +49,13 @@ import org.apache.samza.metrics.JmxServer
import org.apache.samza.metrics.JvmMetrics
import org.apache.samza.metrics.MetricsRegistryMap
import org.apache.samza.metrics.MetricsReporter
+import org.apache.samza.serializers.IntermediateMessageSerde
+import org.apache.samza.serializers.NoOpSerde
+import org.apache.samza.serializers.SerializableSerde
+import org.apache.samza.serializers.Serde
import org.apache.samza.serializers.SerdeFactory
import org.apache.samza.serializers.SerdeManager
+import org.apache.samza.serializers.StringSerde
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.apache.samza.storage.StorageEngineFactory
import org.apache.samza.storage.TaskStorageManager
@@ -168,10 +172,6 @@ object SamzaContainer extends Logging {
info("Got serde streams: %s" format serdeStreams)
- val serdeNames = config.getSerdeNames
-
- info("Got serde names: %s" format serdeNames)
-
val systemFactories = systemNames.map(systemName => {
val systemFactoryClassName = config
.getSystemFactory(systemName)
@@ -222,7 +222,7 @@ object SamzaContainer extends Logging {
info("Got system producers: %s" format producers.keys)
- val serdes = serdeNames.map(serdeName => {
+ val serdesFromFactories = config.getSerdeNames.map(serdeName => {
val serdeClassName = config
.getSerdeClass(serdeName)
.getOrElse(Util.defaultSerdeFactoryFromSerdeName(serdeName))
@@ -232,8 +232,28 @@ object SamzaContainer extends Logging {
(serdeName, serde)
}).toMap
+ info("Got serdes from factories: %s" format serdesFromFactories.keys)
+
+ val serializableSerde = new SerializableSerde[Serde[Object]]()
+ val serdesFromSerializedInstances = config.subset(SerializerConfig.SERIALIZER_PREFIX format "").asScala
+ .filter { case (key, value) => key.endsWith(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX) }
+ .flatMap { case (key, value) =>
+ val serdeName = key.replace(SerializerConfig.SERIALIZED_INSTANCE_SUFFIX, "")
+ debug(s"Trying to deserialize serde instance for $serdeName")
+ try {
+ val bytes = Base64.getDecoder.decode(value)
+ val serdeInstance = serializableSerde.fromBytes(bytes)
+ debug(s"Returning serialized instance for $serdeName")
+ Some((serdeName, serdeInstance))
+ } catch {
+ case e: Exception =>
+ warn(s"Ignoring invalid serialized instance for $serdeName: $value", e)
+ None
+ }
+ }
+ info("Got serdes from serialized instances: %s" format serdesFromSerializedInstances.keys)
- info("Got serdes: %s" format serdes.keys)
+ val serdes = serdesFromFactories ++ serdesFromSerializedInstances
/*
* A Helper function to build a Map[String, Serde] (systemName -> Serde) for systems defined
@@ -242,11 +262,16 @@ object SamzaContainer extends Logging {
val buildSystemSerdeMap = (getSerdeName: (String) => Option[String]) => {
systemNames
.filter(systemName => getSerdeName(systemName).isDefined)
- .map(systemName => {
+ .flatMap(systemName => {
val serdeName = getSerdeName(systemName).get
val serde = serdes.getOrElse(serdeName,
throw new SamzaException("buildSystemSerdeMap: No class defined for serde: %s." format serdeName))
- (systemName, serde)
+
+ // this shouldn't happen since system level serdes can't be set programmatically using the high level
+ // API, but adding this for safety.
+ Option(serde)
+ .filter(!_.isInstanceOf[NoOpSerde[Any]])
+ .map(serde => (systemName, serde))
}).toMap
}
@@ -257,11 +282,15 @@ object SamzaContainer extends Logging {
val buildSystemStreamSerdeMap = (getSerdeName: (SystemStream) => Option[String]) => {
(serdeStreams ++ inputSystemStreamPartitions)
.filter(systemStream => getSerdeName(systemStream).isDefined)
- .map(systemStream => {
+ .flatMap(systemStream => {
val serdeName = getSerdeName(systemStream).get
val serde = serdes.getOrElse(serdeName,
- throw new SamzaException("buildSystemStreamSerdeMap: No class defined for serde: %s." format serdeName))
- (systemStream, serde)
+ throw new SamzaException("buildSystemStreamSerdeMap: No serde found for name: %s." format serdeName))
+
+ // respect explicitly set no-op serdes in high level API
+ Option(serde)
+ .filter(!_.isInstanceOf[NoOpSerde[Any]])
+ .map(serde => (systemStream, serde))
}).toMap
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
deleted file mode 100644
index adb8781..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteBufferSerde.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-import java.nio.ByteBuffer
-
-/**
- * A serializer for ByteBuffers.
- */
-class ByteBufferSerdeFactory extends SerdeFactory[ByteBuffer] {
- def getSerde(name: String, config: Config): Serde[ByteBuffer] = new ByteBufferSerde
-}
-
-class ByteBufferSerde extends Serde[ByteBuffer] {
- def toBytes(byteBuffer: ByteBuffer) = {
- if (byteBuffer != null) {
- val bytes = new Array[Byte](byteBuffer.remaining())
- byteBuffer.duplicate().get(bytes)
- bytes
- } else {
- null
- }
- }
-
- def fromBytes(bytes: Array[Byte]) = if (bytes != null) {
- ByteBuffer.wrap(bytes)
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
deleted file mode 100644
index 968da26..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/ByteSerde.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for bytes that is effectively a no-op but can be useful for
- * binary messages.
- */
-class ByteSerdeFactory extends SerdeFactory[Array[Byte]] {
- def getSerde(name: String, config: Config): Serde[Array[Byte]] = new ByteSerde
-}
-
-class ByteSerde extends Serde[Array[Byte]] {
- def toBytes(bytes: Array[Byte]) = bytes
-
- def fromBytes(bytes: Array[Byte]) = bytes
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
deleted file mode 100644
index 7981d2c..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/DoubleSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for doubles
- */
-class DoubleSerdeFactory extends SerdeFactory[java.lang.Double] {
- def getSerde(name: String, config: Config): Serde[java.lang.Double] = new DoubleSerde
-}
-
-class DoubleSerde extends Serde[java.lang.Double] {
- def toBytes(obj: java.lang.Double): Array[Byte] = if (obj != null) {
- ByteBuffer.allocate(8).putDouble(obj.doubleValue()).array
- } else {
- null
- }
-
- // big-endian by default
- def fromBytes(bytes: Array[Byte]): java.lang.Double = if (bytes != null) {
- ByteBuffer.wrap(bytes).getDouble
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
deleted file mode 100644
index 46509f7..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/IntegerSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for integers
- */
-class IntegerSerdeFactory extends SerdeFactory[java.lang.Integer] {
- def getSerde(name: String, config: Config): Serde[java.lang.Integer] = new IntegerSerde
-}
-
-class IntegerSerde extends Serde[java.lang.Integer] {
- def toBytes(obj: java.lang.Integer): Array[Byte] = if (obj != null) {
- ByteBuffer.allocate(4).putInt(obj.intValue).array
- } else {
- null
- }
-
- // big-endian by default
- def fromBytes(bytes: Array[Byte]): java.lang.Integer = if (bytes != null) {
- ByteBuffer.wrap(bytes).getInt
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
index 4b658c1..5ea0f0d 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/JsonSerde.scala
@@ -20,31 +20,45 @@
package org.apache.samza.serializers
import org.apache.samza.SamzaException
+import org.apache.samza.config.Config
import org.apache.samza.serializers.model.SamzaObjectMapper
import org.codehaus.jackson.`type`.TypeReference
-import org.codehaus.jackson.map.ObjectMapper
-import org.apache.samza.config.Config
+import org.slf4j.LoggerFactory
+
+/**
+ * A serializer for JSON strings that
+ * <ol>
+ * <li>
+ * returns a LinkedHashMap<String, Object> upon deserialization.
+ * <li>
+ * enforces the 'dash-separated' property naming convention.
+ * </ol>
+ * JsonSerdeV2 should be preferred over JsonSerde unless JsonSerde was already being used and backwards
+ * compatibility for data with the dasherized name format is required.
+ */
class JsonSerde[T] extends Serde[T] {
- val mapper = SamzaObjectMapper.getObjectMapper()
+ private val LOG = LoggerFactory.getLogger(classOf[JsonSerde[T]])
+ @transient lazy private val mapper = SamzaObjectMapper.getObjectMapper
def toBytes(obj: T): Array[Byte] = {
try {
mapper.writeValueAsString(obj).getBytes("UTF-8")
- }
- catch {
+ } catch {
case e: Exception => throw new SamzaException(e);
}
}
def fromBytes(bytes: Array[Byte]): T = {
+ val str = new String(bytes, "UTF-8")
try {
- mapper.readValue(new String(bytes, "UTF-8"), new TypeReference[T]() {})}
- catch {
- case e: Exception => throw new SamzaException(e);
+ mapper.readValue(str, new TypeReference[T]() {})
+ } catch {
+ case e: Exception =>
+ LOG.debug(s"Error deserializing message: $str", e)
+ throw new SamzaException(e)
}
}
-
}
class JsonSerdeFactory extends SerdeFactory[Object] {
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
deleted file mode 100644
index 41ff598..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/LongSerde.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import org.apache.samza.config.Config
-
-/**
- * A serializer for longs
- */
-class LongSerdeFactory extends SerdeFactory[java.lang.Long] {
- def getSerde(name: String, config: Config): Serde[java.lang.Long] = new LongSerde
-}
-
-class LongSerde extends Serde[java.lang.Long] {
- def toBytes(obj: java.lang.Long): Array[Byte] = if (obj != null) {
- ByteBuffer.allocate(8).putLong(obj.longValue()).array
- } else {
- null
- }
-
- // big-endian by default
- def fromBytes(bytes: Array[Byte]): java.lang.Long = if (bytes != null) {
- ByteBuffer.wrap(bytes).getLong
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
index 455dd34..8fe3c37 100644
--- a/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
+++ b/samza-core/src/main/scala/org/apache/samza/serializers/MetricsSnapshotSerde.scala
@@ -20,12 +20,10 @@
package org.apache.samza.serializers
import org.apache.samza.config.Config
import org.codehaus.jackson.map.ObjectMapper
-import java.util.Map
-import java.nio.ByteBuffer
import org.apache.samza.metrics.reporter.MetricsSnapshot
class MetricsSnapshotSerde extends Serde[MetricsSnapshot] {
- val jsonMapper = new ObjectMapper
+ @transient lazy val jsonMapper = new ObjectMapper
def toBytes(obj: MetricsSnapshot) = jsonMapper
.writeValueAsString(obj.getAsMap)
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
deleted file mode 100644
index c43f863..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/SerializableSerde.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.io.ByteArrayInputStream
-import java.io.ByteArrayOutputStream
-import java.io.ObjectInputStream
-import java.io.ObjectOutputStream
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for Serializable
- */
-class SerializableSerdeFactory[T <: java.io.Serializable] extends SerdeFactory[T] {
- def getSerde(name: String, config: Config): Serde[T] =
- new SerializableSerde[T]
-}
-
-class SerializableSerde[T <: java.io.Serializable] extends Serde[T] {
- def toBytes(obj: T): Array[Byte] = if (obj != null) {
- val bos = new ByteArrayOutputStream
- val oos = new ObjectOutputStream(bos)
-
- try {
- oos.writeObject(obj)
- }
- finally {
- oos.close()
- }
-
- bos.toByteArray
- } else {
- null
- }
-
- def fromBytes(bytes: Array[Byte]): T = if (bytes != null) {
- val bis = new ByteArrayInputStream(bytes)
- val ois = new ObjectInputStream(bis)
-
- try {
- ois.readObject.asInstanceOf[T]
- }
- finally{
- ois.close()
- }
- } else {
- null.asInstanceOf[T]
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
deleted file mode 100644
index 6e5f260..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/StringSerde.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for strings
- */
-class StringSerdeFactory extends SerdeFactory[String] {
- def getSerde(name: String, config: Config): Serde[String] =
- new StringSerde(config.get("encoding", "UTF-8"))
-}
-
-class StringSerde(val encoding: String) extends Serde[String] {
- def toBytes(obj: String): Array[Byte] = if (obj != null) {
- obj.toString.getBytes(encoding)
- } else {
- null
- }
-
- def fromBytes(bytes: Array[Byte]): String = if (bytes != null) {
- new String(bytes, 0, bytes.size, encoding)
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala b/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
deleted file mode 100644
index 88d4327..0000000
--- a/samza-core/src/main/scala/org/apache/samza/serializers/UUIDSerde.scala
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.samza.serializers
-
-import java.nio.ByteBuffer
-import java.util.UUID
-
-import org.apache.samza.config.Config
-
-/**
- * A serializer for UUID
- */
-class UUIDSerdeFactory extends SerdeFactory[UUID] {
- def getSerde(name: String, config: Config): Serde[UUID] = new UUIDSerde
-}
-
-class UUIDSerde() extends Serde[UUID] {
- def toBytes(obj: UUID): Array[Byte] = if (obj != null) {
- ByteBuffer.allocate(16).putLong(obj.getMostSignificantBits).putLong(obj.getLeastSignificantBits).array
- } else {
- null
- }
-
- def fromBytes(bytes: Array[Byte]): UUID = if (bytes != null) {
- val buffer = ByteBuffer.wrap(bytes)
- new UUID(buffer.getLong, buffer.getLong)
- } else {
- null
- }
-}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
index 73a89af..7061732 100644
--- a/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/BroadcastExample.java
@@ -21,10 +21,14 @@ package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.util.CommandLine;
@@ -35,14 +39,12 @@ public class BroadcastExample implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<PageViewEvent> inputStream =
- graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
- OutputStream<String, PageViewEvent, PageViewEvent> outputStream1 =
- graph.getOutputStream("outputStream1", m -> m.key, m -> m);
- OutputStream<String, PageViewEvent, PageViewEvent> outputStream2 =
- graph.getOutputStream("outputStream2", m -> m.key, m -> m);
- OutputStream<String, PageViewEvent, PageViewEvent> outputStream3 =
- graph.getOutputStream("outputStream3", m -> m.key, m -> m);
+ graph.setDefaultSerde(KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)));
+
+ MessageStream<KV<String, PageViewEvent>> inputStream = graph.getInputStream("inputStream");
+ OutputStream<KV<String, PageViewEvent>> outputStream1 = graph.getOutputStream("outputStream1");
+ OutputStream<KV<String, PageViewEvent>> outputStream2 = graph.getOutputStream("outputStream2");
+ OutputStream<KV<String, PageViewEvent>> outputStream3 = graph.getOutputStream("outputStream3");
inputStream.filter(m -> m.key.equals("key1")).sendTo(outputStream1);
inputStream.filter(m -> m.key.equals("key2")).sendTo(outputStream2);
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
index 5be3046..c75608f 100644
--- a/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/KeyValueStoreExample.java
@@ -21,11 +21,15 @@ package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.FlatMapFunction;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.task.TaskContext;
import org.apache.samza.util.CommandLine;
@@ -42,15 +46,19 @@ import java.util.concurrent.TimeUnit;
public class KeyValueStoreExample implements StreamApplication {
@Override public void init(StreamGraph graph, Config config) {
- MessageStream<PageViewEvent> pageViewEvents = graph.getInputStream(
- "pageViewEventStream", (k, v) -> (PageViewEvent) v);
- OutputStream<String, StatsOutput, StatsOutput> pageViewEventPerMemberStream = graph.getOutputStream(
- "pageViewEventPerMemberStream", statsOutput -> statsOutput.memberId, statsOutput -> statsOutput);
-
- pageViewEvents.
- partitionBy(m -> m.memberId).
- flatMap(new MyStatsCounter()).
- sendTo(pageViewEventPerMemberStream);
+ MessageStream<PageViewEvent> pageViewEvents =
+ graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, StatsOutput>> pageViewEventPerMember =
+ graph.getOutputStream("pageViewEventPerMember",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(StatsOutput.class)));
+
+ pageViewEvents
+ .partitionBy(pve -> pve.memberId, pve -> pve,
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+ .map(KV::getValue)
+ .flatMap(new MyStatsCounter())
+ .map(stats -> KV.of(stats.memberId, stats))
+ .sendTo(pageViewEventPerMember);
}
// local execution mode
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
index 9fbf6d1..4702c9a 100644
--- a/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/MergeExample.java
@@ -22,23 +22,31 @@ package org.apache.samza.example;
import com.google.common.collect.ImmutableList;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.util.CommandLine;
public class MergeExample implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<Object> inputStream1 = graph.getInputStream("inputStream1", (k, m) -> m);
- MessageStream<Object> inputStream2 = graph.getInputStream("inputStream2", (k, m) -> m);
- MessageStream<Object> inputStream3 = graph.getInputStream("inputStream3", (k, m) -> m);
- OutputStream<Integer, Object, Object> outputStream = graph
- .getOutputStream("outputStream", Object::hashCode, m -> m);
+ graph.setDefaultSerde(new StringSerde());
- MessageStream.mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
+ MessageStream<String> inputStream1 = graph.getInputStream("inputStream1");
+ MessageStream<String> inputStream2 = graph.getInputStream("inputStream2");
+ MessageStream<String> inputStream3 = graph.getInputStream("inputStream3");
+ OutputStream<KV<Integer, String>> outputStream =
+ graph.getOutputStream("outputStream", KVSerde.of(new IntegerSerde(), new StringSerde()));
+
+ MessageStream
+ .mergeAll(ImmutableList.of(inputStream1, inputStream2, inputStream3))
+ .map(m -> KV.of(m.hashCode(), m))
.sendTo(outputStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
index f65c4ed..95939c4 100644
--- a/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/OrderShipmentJoinExample.java
@@ -20,11 +20,15 @@ package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.functions.JoinFunction;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -36,14 +40,18 @@ public class OrderShipmentJoinExample implements StreamApplication {
@Override
public void init(StreamGraph graph, Config config) {
- MessageStream<OrderRecord> orders = graph.getInputStream("orderStream", (k, m) -> (OrderRecord) m);
- MessageStream<ShipmentRecord> shipments = graph.getInputStream("shipmentStream", (k, m) -> (ShipmentRecord) m);
- OutputStream<String, FulFilledOrderRecord, FulFilledOrderRecord> joinedOrderShipmentStream =
- graph.getOutputStream("joinedOrderShipmentStream", m -> m.orderId, m -> m);
+ MessageStream<OrderRecord> orders =
+ graph.getInputStream("orders", new JsonSerdeV2<>(OrderRecord.class));
+ MessageStream<ShipmentRecord> shipments =
+ graph.getInputStream("shipments", new JsonSerdeV2<>(ShipmentRecord.class));
+ OutputStream<KV<String, FulfilledOrderRecord>> fulfilledOrders =
+ graph.getOutputStream("fulfilledOrders",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(FulfilledOrderRecord.class)));
orders
.join(shipments, new MyJoinFunction(), Duration.ofMinutes(1))
- .sendTo(joinedOrderShipmentStream);
+ .map(fulFilledOrder -> KV.of(fulFilledOrder.orderId, fulFilledOrder))
+ .sendTo(fulfilledOrders);
}
// local execution mode
@@ -54,10 +62,10 @@ public class OrderShipmentJoinExample implements StreamApplication {
localRunner.run(new OrderShipmentJoinExample());
}
- class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulFilledOrderRecord> {
+ class MyJoinFunction implements JoinFunction<String, OrderRecord, ShipmentRecord, FulfilledOrderRecord> {
@Override
- public FulFilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
- return new FulFilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
+ public FulfilledOrderRecord apply(OrderRecord message, ShipmentRecord otherMessage) {
+ return new FulfilledOrderRecord(message.orderId, message.orderTimeMs, otherMessage.shipTimeMs);
}
@Override
@@ -91,12 +99,12 @@ public class OrderShipmentJoinExample implements StreamApplication {
}
}
- class FulFilledOrderRecord {
+ class FulfilledOrderRecord {
String orderId;
long orderTimeMs;
long shipTimeMs;
- FulFilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
+ FulfilledOrderRecord(String orderId, long orderTimeMs, long shipTimeMs) {
this.orderId = orderId;
this.orderTimeMs = orderTimeMs;
this.shipTimeMs = shipTimeMs;
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
index a3471a2..91657ed 100644
--- a/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/PageViewCounterExample.java
@@ -20,6 +20,7 @@ package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
@@ -29,6 +30,9 @@ import org.apache.samza.operators.windows.AccumulationMode;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -42,9 +46,10 @@ public class PageViewCounterExample implements StreamApplication {
@Override public void init(StreamGraph graph, Config config) {
MessageStream<PageViewEvent> pageViewEvents =
- graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
- OutputStream<String, PageViewCount, PageViewCount> pageViewEventPerMemberStream = graph
- .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+ graph.getInputStream("pageViewEventStream", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, PageViewCount>> pageViewEventPerMemberStream =
+ graph.getOutputStream("pageViewEventPerMemberStream",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewCount.class)));
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> foldLeftFn = (m, c) -> c + 1;
@@ -52,7 +57,7 @@ public class PageViewCounterExample implements StreamApplication {
.window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofSeconds(10), initialValue, foldLeftFn)
.setEarlyTrigger(Triggers.repeat(Triggers.count(5)))
.setAccumulationMode(AccumulationMode.DISCARDING))
- .map(PageViewCount::new)
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), new PageViewCount(windowPane)))
.sendTo(pageViewEventPerMemberStream);
}
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
index 7bf939b..e9bb284 100644
--- a/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/RepartitionExample.java
@@ -20,16 +20,19 @@ package org.apache.samza.example;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.Config;
+import org.apache.samza.operators.KV;
import org.apache.samza.operators.MessageStream;
import org.apache.samza.operators.OutputStream;
import org.apache.samza.operators.StreamGraph;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.JsonSerdeV2;
+import org.apache.samza.serializers.KVSerde;
+import org.apache.samza.serializers.StringSerde;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
-import java.util.function.Supplier;
/**
@@ -38,17 +41,18 @@ import java.util.function.Supplier;
public class RepartitionExample implements StreamApplication {
@Override public void init(StreamGraph graph, Config config) {
- Supplier<Integer> initialValue = () -> 0;
MessageStream<PageViewEvent> pageViewEvents =
- graph.getInputStream("pageViewEventStream", (k, m) -> (PageViewEvent) m);
- OutputStream<String, MyStreamOutput, MyStreamOutput> pageViewEventPerMemberStream = graph
- .getOutputStream("pageViewEventPerMemberStream", m -> m.memberId, m -> m);
+ graph.getInputStream("pageViewEvent", new JsonSerdeV2<>(PageViewEvent.class));
+ OutputStream<KV<String, MyStreamOutput>> pageViewEventPerMember =
+ graph.getOutputStream("pageViewEventPerMember",
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(MyStreamOutput.class)));
pageViewEvents
- .partitionBy(m -> m.memberId)
- .window(Windows.keyedTumblingWindow(m -> m.memberId, Duration.ofMinutes(5), initialValue, (m, c) -> c + 1))
- .map(MyStreamOutput::new)
- .sendTo(pageViewEventPerMemberStream);
+ .partitionBy(pve -> pve.memberId, pve -> pve,
+ KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageViewEvent.class)))
+ .window(Windows.keyedTumblingWindow(KV::getKey, Duration.ofMinutes(5), () -> 0, (m, c) -> c + 1))
+ .map(windowPane -> KV.of(windowPane.getKey().getKey(), new MyStreamOutput(windowPane)))
+ .sendTo(pageViewEventPerMember);
}
// local execution mode
http://git-wip-us.apache.org/repos/asf/samza/blob/f16ba269/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
index 1fd3be5..08c896c 100644
--- a/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
+++ b/samza-core/src/test/java/org/apache/samza/example/WindowExample.java
@@ -29,6 +29,8 @@ import org.apache.samza.operators.triggers.Triggers;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.Windows;
import org.apache.samza.runtime.LocalApplicationRunner;
+import org.apache.samza.serializers.IntegerSerde;
+import org.apache.samza.serializers.JsonSerdeV2;
import org.apache.samza.util.CommandLine;
import java.time.Duration;
@@ -45,9 +47,8 @@ public class WindowExample implements StreamApplication {
public void init(StreamGraph graph, Config config) {
Supplier<Integer> initialValue = () -> 0;
FoldLeftFunction<PageViewEvent, Integer> counter = (m, c) -> c == null ? 1 : c + 1;
- MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", (k, m) -> (PageViewEvent) m);
- OutputStream<String, Integer, WindowPane<Void, Integer>> outputStream = graph
- .getOutputStream("outputStream", m -> m.getKey().getPaneId(), m -> m.getMessage());
+ MessageStream<PageViewEvent> inputStream = graph.getInputStream("inputStream", new JsonSerdeV2<PageViewEvent>());
+ OutputStream<Integer> outputStream = graph.getOutputStream("outputStream", new IntegerSerde());
// create a tumbling window that outputs the number of message collected every 10 minutes.
// also emit early results if either the number of messages collected reaches 30000, or if no new messages arrive
@@ -55,6 +56,7 @@ public class WindowExample implements StreamApplication {
inputStream
.window(Windows.tumblingWindow(Duration.ofMinutes(10), initialValue, counter)
.setLateTrigger(Triggers.any(Triggers.count(30000), Triggers.timeSinceLastMessage(Duration.ofMinutes(1)))))
+ .map(WindowPane::getMessage)
.sendTo(outputStream);
}